Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2014-03-07 17:03:28 -08:00
commit a973917340
77 changed files with 2443 additions and 470 deletions

View File

@ -0,0 +1,25 @@
---
layout: doc_page
---
A data source is the Druid equivalent of a database table. However, a query can also masquerade as a data source, providing subquery-like functionality. Query data sources are currently only supported by [GroupBy](GroupByQuery.html) queries.
### Table Data Source
The table data source the most common type. It's represented by a string, or by the full structure:
```json
{
"type": "table",
"name": <string_value>
}
```
### Query Data Source
```json
{
"type": "query",
"query": {
"type": "groupBy",
...
}
}
```

View File

@ -48,7 +48,7 @@ There are 9 main parts to a groupBy query:
|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be "groupBy"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
|dataSource|A String defining the data source to query, very similar to a table in a relational database, or a [DataSource](DataSource.html) structure.|yes|
|dimensions|A JSON list of dimensions to do the groupBy over|yes|
|orderBy|See [OrderBy](OrderBy.html).|no|
|having|See [Having](Having.html).|no|

View File

@ -90,7 +90,7 @@ public class IndexerDBCoordinator
final ResultIterator<Map<String, Object>> dbSegments =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE used = 1 AND dataSource = :dataSource",
"SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource",
dbTables.getSegmentsTable()
)
)
@ -304,8 +304,8 @@ public class IndexerDBCoordinator
return handle.createQuery(
String.format(
DbConnector.isPostgreSQL(handle)?
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = false",
dbTables.getSegmentsTable()
)
)

View File

@ -38,6 +38,7 @@ import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.task.Task;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.TableDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
@ -152,10 +153,17 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
String queryDataSource;
try {
queryDataSource = ((TableDataSource)query.getDataSource()).getName();
}
catch (ClassCastException e) {
throw new IllegalArgumentException("Subqueries are not welcome here");
}
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(query.getDataSource())) {
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
if (taskQueryRunner != null) {
@ -163,7 +171,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", query.getDataSource())
.addData("dataSource", queryDataSource)
.emit();
}
}

View File

@ -36,13 +36,13 @@ import java.util.Map;
public abstract class BaseQuery<T> implements Query<T>
{
public static String QUERYID = "queryId";
private final String dataSource;
private final DataSource dataSource;
private final Map<String, String> context;
private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration;
public BaseQuery(
String dataSource,
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
Map<String, String> context
)
@ -50,14 +50,14 @@ public abstract class BaseQuery<T> implements Query<T>
Preconditions.checkNotNull(dataSource, "dataSource can't be null");
Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec can't be null");
this.dataSource = dataSource.toLowerCase();
this.dataSource = dataSource;
this.context = context;
this.querySegmentSpec = querySegmentSpec;
}
@JsonProperty
@Override
public String getDataSource()
public DataSource getDataSource()
{
return dataSource;
}
@ -143,4 +143,31 @@ public abstract class BaseQuery<T> implements Query<T>
{
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BaseQuery baseQuery = (BaseQuery) o;
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) return false;
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) return false;
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) return false;
if (querySegmentSpec != null ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) : baseQuery.querySegmentSpec != null)
return false;
return true;
}
@Override
public int hashCode()
{
int result = dataSource != null ? dataSource.hashCode() : 0;
result = 31 * result + (context != null ? context.hashCode() : 0);
result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0);
result = 31 * result + (duration != null ? duration.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type",
defaultImpl = LegacyDataSource.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query")
})
public interface DataSource
{
}

View File

@ -298,7 +298,7 @@ public class Druids
*/
public static class TimeseriesQueryBuilder
{
private String dataSource;
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private DimFilter dimFilter;
private QueryGranularity granularity;
@ -308,7 +308,7 @@ public class Druids
private TimeseriesQueryBuilder()
{
dataSource = "";
dataSource = null;
querySegmentSpec = null;
dimFilter = null;
granularity = QueryGranularity.ALL;
@ -354,7 +354,7 @@ public class Druids
.context(builder.context);
}
public String getDataSource()
public DataSource getDataSource()
{
return dataSource;
}
@ -390,6 +390,12 @@ public class Druids
}
public TimeseriesQueryBuilder dataSource(String ds)
{
dataSource = new TableDataSource(ds);
return this;
}
public TimeseriesQueryBuilder dataSource(DataSource ds)
{
dataSource = ds;
return this;
@ -492,7 +498,7 @@ public class Druids
*/
public static class SearchQueryBuilder
{
private String dataSource;
private DataSource dataSource;
private DimFilter dimFilter;
private QueryGranularity granularity;
private int limit;
@ -503,7 +509,7 @@ public class Druids
public SearchQueryBuilder()
{
dataSource = "";
dataSource = null;
dimFilter = null;
granularity = QueryGranularity.ALL;
limit = 0;
@ -531,7 +537,7 @@ public class Druids
public SearchQueryBuilder copy(SearchQuery query)
{
return new SearchQueryBuilder()
.dataSource(query.getDataSource())
.dataSource(((TableDataSource)query.getDataSource()).getName())
.intervals(query.getQuerySegmentSpec())
.filters(query.getDimensionsFilter())
.granularity(query.getGranularity())
@ -555,6 +561,12 @@ public class Druids
}
public SearchQueryBuilder dataSource(String d)
{
dataSource = new TableDataSource(d);
return this;
}
public SearchQueryBuilder dataSource(DataSource d)
{
dataSource = d;
return this;
@ -676,13 +688,13 @@ public class Druids
*/
public static class TimeBoundaryQueryBuilder
{
private String dataSource;
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private Map<String, String> context;
public TimeBoundaryQueryBuilder()
{
dataSource = "";
dataSource = null;
querySegmentSpec = null;
context = null;
}
@ -704,9 +716,15 @@ public class Druids
.context(builder.context);
}
public TimeBoundaryQueryBuilder dataSource(String d)
public TimeBoundaryQueryBuilder dataSource(String ds)
{
dataSource = d;
dataSource = new TableDataSource(ds);
return this;
}
public TimeBoundaryQueryBuilder dataSource(DataSource ds)
{
dataSource = ds;
return this;
}

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("table")
public class LegacyDataSource extends TableDataSource
{
@JsonCreator
public LegacyDataSource(String name)
{
super(name);
}
}

View File

@ -56,7 +56,7 @@ public interface Query<T>
public static final String SELECT = "select";
public static final String TOPN = "topN";
public String getDataSource();
public DataSource getDataSource();
public boolean hasFilters();

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("query")
public class QueryDataSource implements DataSource
{
@JsonProperty
private final Query query;
@JsonCreator
public QueryDataSource(@JsonProperty("query") Query query)
{
this.query = query;
}
public Query getQuery()
{
return query;
}
public String toString() { return query.toString(); }
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QueryDataSource that = (QueryDataSource) o;
if (!query.equals(that.query)) return false;
return true;
}
@Override
public int hashCode()
{
return query.hashCode();
}
}

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.metamx.common.guava.Sequence;
/**
* If there's a subquery, run it instead of the outer query
*/
public class SubqueryQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
public SubqueryQueryRunner(QueryRunner<T> baseRunner)
{
this.baseRunner = baseRunner;
}
@Override
public Sequence<T> run(final Query<T> query)
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
return run((Query<T>) ((QueryDataSource) dataSource).getQuery());
} else {
return baseRunner.run(query);
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("table")
public class TableDataSource implements DataSource
{
@JsonProperty
private final String name;
@JsonCreator
public TableDataSource(@JsonProperty("name") String name)
{
this.name = (name == null ? null : name.toLowerCase());
}
public String getName()
{
return name;
}
public String toString() { return name; }
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (!(o instanceof TableDataSource)) return false;
TableDataSource that = (TableDataSource) o;
if (!name.equals(that.name)) return false;
return true;
}
@Override
public int hashCode()
{
return name.hashCode();
}
}

View File

@ -132,4 +132,23 @@ public class CountAggregatorFactory implements AggregatorFactory
"name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CountAggregatorFactory that = (CountAggregatorFactory) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
return name != null ? name.hashCode() : 0;
}
}

View File

@ -150,4 +150,26 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -179,4 +179,30 @@ public class HistogramAggregatorFactory implements AggregatorFactory
", breaks=" + Arrays.toString(breaks) +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HistogramAggregatorFactory that = (HistogramAggregatorFactory) o;
if (!Arrays.equals(breaks, that.breaks)) return false;
if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) return false;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0);
result = 31 * result + (breaksList != null ? breaksList.hashCode() : 0);
result = 31 * result + (breaks != null ? Arrays.hashCode(breaks) : 0);
return result;
}
}

View File

@ -317,4 +317,35 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
}
};
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JavaScriptAggregatorFactory that = (JavaScriptAggregatorFactory) o;
if (compiledScript != null ? !compiledScript.equals(that.compiledScript) : that.compiledScript != null)
return false;
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) return false;
if (fnAggregate != null ? !fnAggregate.equals(that.fnAggregate) : that.fnAggregate != null) return false;
if (fnCombine != null ? !fnCombine.equals(that.fnCombine) : that.fnCombine != null) return false;
if (fnReset != null ? !fnReset.equals(that.fnReset) : that.fnReset != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (fnAggregate != null ? fnAggregate.hashCode() : 0);
result = 31 * result + (fnReset != null ? fnReset.hashCode() : 0);
result = 31 * result + (fnCombine != null ? fnCombine.hashCode() : 0);
result = 31 * result + (compiledScript != null ? compiledScript.hashCode() : 0);
return result;
}
}

View File

@ -150,4 +150,26 @@ public class LongSumAggregatorFactory implements AggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LongSumAggregatorFactory that = (LongSumAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -150,4 +150,26 @@ public class MaxAggregatorFactory implements AggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MaxAggregatorFactory that = (MaxAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -150,4 +150,26 @@ public class MinAggregatorFactory implements AggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MinAggregatorFactory that = (MinAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -112,4 +112,24 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory
{
return baseAggregatorFactory.getAggregatorStartValue();
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ToLowerCaseAggregatorFactory that = (ToLowerCaseAggregatorFactory) o;
if (baseAggregatorFactory != null ? !baseAggregatorFactory.equals(that.baseAggregatorFactory) : that.baseAggregatorFactory != null)
return false;
return true;
}
@Override
public int hashCode()
{
return baseAggregatorFactory != null ? baseAggregatorFactory.hashCode() : 0;
}
}

View File

@ -207,4 +207,26 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
", fieldName='" + fieldName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HyperUniquesAggregatorFactory that = (HyperUniquesAggregatorFactory) o;
if (!fieldName.equals(that.fieldName)) return false;
if (!name.equals(that.name)) return false;
return true;
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}
}

View File

@ -193,4 +193,30 @@ public class ArithmeticPostAggregator implements PostAggregator
return lookupMap.keySet();
}
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ArithmeticPostAggregator that = (ArithmeticPostAggregator) o;
if (fields != null ? !fields.equals(that.fields) : that.fields != null) return false;
if (fnName != null ? !fnName.equals(that.fnName) : that.fnName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (op != that.op) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fnName != null ? fnName.hashCode() : 0);
result = 31 * result + (fields != null ? fields.hashCode() : 0);
result = 31 * result + (op != null ? op.hashCode() : 0);
return result;
}
}

View File

@ -91,4 +91,33 @@ public class ConstantPostAggregator implements PostAggregator
", constantValue=" + constantValue +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConstantPostAggregator that = (ConstantPostAggregator) o;
if (constantValue != null && that.constantValue != null) {
if (constantValue.doubleValue() != that.constantValue.doubleValue())
return false;
}
else if (constantValue != that.constantValue) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (constantValue != null ? constantValue.hashCode() : 0);
return result;
}
}

View File

@ -84,4 +84,26 @@ public class FieldAccessPostAggregator implements PostAggregator
", fieldName='" + fieldName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldAccessPostAggregator that = (FieldAccessPostAggregator) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0);
return result;
}
}

View File

@ -142,4 +142,30 @@ public class JavaScriptPostAggregator implements PostAggregator
{
return function;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JavaScriptPostAggregator that = (JavaScriptPostAggregator) o;
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) return false;
if (fn != null ? !fn.equals(that.fn) : that.fn != null) return false;
if (function != null ? !function.equals(that.function) : that.function != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (function != null ? function.hashCode() : 0);
result = 31 * result + (fn != null ? fn.hashCode() : 0);
return result;
}
}

View File

@ -84,4 +84,26 @@ public class DefaultDimensionSpec implements DimensionSpec
", outputName='" + outputName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultDimensionSpec that = (DefaultDimensionSpec) o;
if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) return false;
if (outputName != null ? !outputName.equals(that.outputName) : that.outputName != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = dimension != null ? dimension.hashCode() : 0;
result = 31 * result + (outputName != null ? outputName.hashCode() : 0);
return result;
}
}

View File

@ -92,4 +92,29 @@ public class ExtractionDimensionSpec implements DimensionSpec
", outputName='" + outputName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExtractionDimensionSpec that = (ExtractionDimensionSpec) o;
if (dimExtractionFn != null ? !dimExtractionFn.equals(that.dimExtractionFn) : that.dimExtractionFn != null)
return false;
if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) return false;
if (outputName != null ? !outputName.equals(that.outputName) : that.outputName != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = dimension != null ? dimension.hashCode() : 0;
result = 31 * result + (dimExtractionFn != null ? dimExtractionFn.hashCode() : 0);
result = 31 * result + (outputName != null ? outputName.hashCode() : 0);
return result;
}
}

View File

@ -33,7 +33,11 @@ import com.metamx.common.guava.Sequences;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
@ -72,7 +76,7 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonCreator
public GroupByQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@ -133,7 +137,7 @@ public class GroupByQuery extends BaseQuery<Row>
* have already passed in order for the object to exist.
*/
private GroupByQuery(
String dataSource,
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
DimFilter dimFilter,
QueryGranularity granularity,
@ -255,7 +259,7 @@ public class GroupByQuery extends BaseQuery<Row>
public static class Builder
{
private String dataSource;
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private DimFilter dimFilter;
private QueryGranularity granularity;
@ -270,7 +274,9 @@ public class GroupByQuery extends BaseQuery<Row>
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
private int limit = Integer.MAX_VALUE;
private Builder() {}
private Builder()
{
}
private Builder(Builder builder)
{
@ -288,12 +294,24 @@ public class GroupByQuery extends BaseQuery<Row>
context = builder.context;
}
public Builder setDataSource(String dataSource)
public Builder setDataSource(DataSource dataSource)
{
this.dataSource = dataSource;
return this;
}
public Builder setDataSource(String dataSource)
{
this.dataSource = new TableDataSource(dataSource);
return this;
}
public Builder setDataSource(Query query)
{
this.dataSource = new QueryDataSource(query);
return this;
}
public Builder setInterval(Object interval)
{
return setQuerySegmentSpec(new LegacySegmentSpec(interval));
@ -479,13 +497,52 @@ public class GroupByQuery extends BaseQuery<Row>
public String toString()
{
return "GroupByQuery{" +
"limitSpec=" + limitSpec +
", dimFilter=" + dimFilter +
", granularity=" + granularity +
", dimensions=" + dimensions +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", orderByLimitFn=" + orderByLimitFn +
'}';
"limitSpec=" + limitSpec +
", dimFilter=" + dimFilter +
", granularity=" + granularity +
", dimensions=" + dimensions +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", orderByLimitFn=" + orderByLimitFn +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
GroupByQuery that = (GroupByQuery) o;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null)
return false;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false;
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) return false;
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) return false;
if (orderByLimitFn != null ? !orderByLimitFn.equals(that.orderByLimitFn) : that.orderByLimitFn != null)
return false;
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null)
return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (limitSpec != null ? limitSpec.hashCode() : 0);
result = 31 * result + (havingSpec != null ? havingSpec.hashCode() : 0);
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0);
return result;
}
}

View File

@ -24,9 +24,6 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;

View File

@ -34,13 +34,17 @@ import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.query.DataSource;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;
import org.joda.time.Minutes;
@ -56,13 +60,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
private final Supplier<GroupByQueryConfig> configSupplier;
private GroupByQueryEngine engine; // For running the outer query around a subquery
@Inject
public GroupByQueryQueryToolChest(
Supplier<GroupByQueryConfig> configSupplier
Supplier<GroupByQueryConfig> configSupplier,
GroupByQueryEngine engine
)
{
this.configSupplier = configSupplier;
this.engine = engine;
}
@Override
@ -84,13 +91,32 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
{
final GroupByQueryConfig config = configSupplier.get();
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
config
);
IncrementalIndex index = runner.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
Sequence<Row> result;
// If there's a subquery, merge subquery results and then apply the aggregator
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery;
try {
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
}
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
IncrementalIndexStorageAdapter adapter
= new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult));
result = engine.process(query, adapter);
} else {
result = runner.run(query);
}
return postAggregate(query, makeIncrementalIndex(query, result));
}
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
{
Sequence<Row> sequence = Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
@ -101,7 +127,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(
query.getGranularity()
.toDateTime(row.getTimestampFromEpoch()),
.toDateTime(row.getTimestampFromEpoch()),
row.getEvent()
);
}
@ -110,6 +136,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return query.applyLimit(sequence);
}
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
{
final GroupByQueryConfig config = configSupplier.get();
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
config
);
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
@ -125,7 +163,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser3(String.format("%,d dims", query.getDimensions().size()))
.setUser4("groupBy")
.setUser5(Joiner.on(",").join(query.getIntervals()))
@ -165,6 +203,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
{
return new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod());
return new SubqueryQueryRunner<Row>(
new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod()));
}
}

View File

@ -196,6 +196,25 @@ public class DefaultLimitSpec implements LimitSpec
{
return Sequences.limit(input, limit);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LimitingFn that = (LimitingFn) o;
if (limit != that.limit) return false;
return true;
}
@Override
public int hashCode()
{
return limit;
}
}
private static class SortingFn implements Function<Sequence<Row>, Sequence<Row>>
@ -209,6 +228,25 @@ public class DefaultLimitSpec implements LimitSpec
{
return Sequences.sort(input, ordering);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SortingFn sortingFn = (SortingFn) o;
if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) return false;
return true;
}
@Override
public int hashCode()
{
return ordering != null ? ordering.hashCode() : 0;
}
}
private static class TopNFunction implements Function<Sequence<Row>, Sequence<Row>>
@ -231,5 +269,49 @@ public class DefaultLimitSpec implements LimitSpec
final ArrayList<Row> materializedList = Sequences.toList(input, Lists.<Row>newArrayList());
return Sequences.simple(sorter.toTopN(materializedList, limit));
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopNFunction that = (TopNFunction) o;
if (limit != that.limit) return false;
if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = sorter != null ? sorter.hashCode() : 0;
result = 31 * result + limit;
return result;
}
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultLimitSpec that = (DefaultLimitSpec) o;
if (limit != that.limit) return false;
if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = columns != null ? columns.hashCode() : 0;
result = 31 * result + limit;
return result;
}
}

View File

@ -46,4 +46,15 @@ public class NoopLimitSpec implements LimitSpec
{
return "NoopLimitSpec";
}
@Override
public boolean equals(Object other)
{
return (other instanceof NoopLimitSpec);
}
@Override
public int hashCode() {
return 0;
}
}

View File

@ -147,7 +147,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
@ -179,9 +179,9 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{
byte[] includerBytes = query.getToInclude().getCacheKey();
return ByteBuffer.allocate(1 + includerBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.array();
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.array();
}
@Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.BaseQuery;
import io.druid.query.Query;
import io.druid.query.TableDataSource;
import io.druid.query.spec.QuerySegmentSpec;
import java.util.Map;
@ -42,7 +43,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
@JsonProperty("context") Map<String, String> context
)
{
super(dataSource, querySegmentSpec, context);
super(new TableDataSource(dataSource), querySegmentSpec, context);
this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
this.merge = merge == null ? false : merge;
@ -76,13 +77,40 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
public Query<SegmentAnalysis> withOverriddenContext(Map<String, String> contextOverride)
{
return new SegmentMetadataQuery(
getDataSource(), getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride)
((TableDataSource)getDataSource()).getName(),
getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride)
);
}
@Override
public Query<SegmentAnalysis> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new SegmentMetadataQuery(getDataSource(), spec, toInclude, merge, getContext());
return new SegmentMetadataQuery(
((TableDataSource)getDataSource()).getName(),
spec, toInclude, merge, getContext());
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
SegmentMetadataQuery that = (SegmentMetadataQuery) o;
if (merge != that.merge) return false;
if (toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0);
result = 31 * result + (merge ? 1 : 0);
return result;
}
}

View File

@ -121,7 +121,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4("search")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
@ -173,7 +173,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
final ByteBuffer queryCacheKey = ByteBuffer
.allocate(
1 + 4 + granularityBytes.length + filterBytes.length +
querySpecBytes.length + dimensionsBytesSize
querySpecBytes.length + dimensionsBytesSize
)
.put(SEARCH_QUERY)
.put(Ints.toByteArray(query.getLimit()))

View File

@ -99,4 +99,23 @@ public class FragmentSearchQuerySpec implements SearchQuerySpec
"values=" + values +
"}";
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FragmentSearchQuerySpec that = (FragmentSearchQuerySpec) o;
if (values != null ? !values.equals(that.values) : that.values != null) return false;
return true;
}
@Override
public int hashCode()
{
return values != null ? values.hashCode() : 0;
}
}

View File

@ -73,4 +73,23 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec
"value=" + value +
"}";
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InsensitiveContainsSearchQuerySpec that = (InsensitiveContainsSearchQuerySpec) o;
if (value != null ? !value.equals(that.value) : that.value != null) return false;
return true;
}
@Override
public int hashCode()
{
return value != null ? value.hashCode() : 0;
}
}

View File

@ -50,4 +50,9 @@ public class LexicographicSearchSortSpec implements SearchSortSpec
{
return "lexicographicSort";
}
@Override
public boolean equals(Object other) {
return (other instanceof LexicographicSearchSortSpec);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
@ -49,7 +50,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
@JsonCreator
public SearchQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("limit") int limit,
@ -181,13 +182,45 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
public String toString()
{
return "SearchQuery{" +
"dataSource='" + getDataSource() + '\'' +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", dimensions=" + dimensions +
", querySpec=" + querySpec +
", querySegmentSpec=" + getQuerySegmentSpec() +
", limit=" + limit +
'}';
"dataSource='" + getDataSource() + '\'' +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", dimensions=" + dimensions +
", querySpec=" + querySpec +
", querySegmentSpec=" + getQuerySegmentSpec() +
", limit=" + limit +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
SearchQuery that = (SearchQuery) o;
if (limit != that.limit) return false;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false;
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (querySpec != null ? !querySpec.equals(that.querySpec) : that.querySpec != null) return false;
if (sortSpec != null ? !sortSpec.equals(that.sortSpec) : that.sortSpec != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (sortSpec != null ? sortSpec.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (querySpec != null ? querySpec.hashCode() : 0);
result = 31 * result + limit;
return result;
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
@ -45,7 +46,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
@JsonCreator
public SelectQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@ -146,4 +147,34 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
", pagingSpec=" + pagingSpec +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
SelectQuery that = (SelectQuery) o;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false;
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (metrics != null ? !metrics.equals(that.metrics) : that.metrics != null) return false;
if (pagingSpec != null ? !pagingSpec.equals(that.pagingSpec) : that.pagingSpec != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
result = 31 * result + (pagingSpec != null ? pagingSpec.hashCode() : 0);
return result;
}
}

View File

@ -123,7 +123,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4("Select")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))

View File

@ -64,4 +64,23 @@ public class MultipleIntervalSegmentSpec implements QuerySegmentSpec
"intervals=" + intervals +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MultipleIntervalSegmentSpec that = (MultipleIntervalSegmentSpec) o;
if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) return false;
return true;
}
@Override
public int hashCode()
{
return intervals != null ? intervals.hashCode() : 0;
}
}

View File

@ -91,4 +91,26 @@ public class MultipleSpecificSegmentSpec implements QuerySegmentSpec
"descriptors=" + descriptors +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MultipleSpecificSegmentSpec that = (MultipleSpecificSegmentSpec) o;
if (descriptors != null ? !descriptors.equals(that.descriptors) : that.descriptors != null) return false;
if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) return false;
return true;
}
@Override
public int hashCode()
{
int result = descriptors != null ? descriptors.hashCode() : 0;
result = 31 * result + (intervals != null ? intervals.hashCode() : 0);
return result;
}
}

View File

@ -51,4 +51,23 @@ public class SpecificSegmentSpec implements QuerySegmentSpec
{
return walker.getQueryRunnerForSegments(query, Arrays.asList(descriptor));
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SpecificSegmentSpec that = (SpecificSegmentSpec) o;
if (descriptor != null ? !descriptor.equals(that.descriptor) : that.descriptor != null) return false;
return true;
}
@Override
public int hashCode()
{
return descriptor != null ? descriptor.hashCode() : 0;
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
@ -51,7 +52,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
@JsonCreator
public TimeBoundaryQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("context") Map<String, String> context
)

View File

@ -78,7 +78,7 @@ public class TimeBoundaryQueryQueryToolChest
public boolean apply(T input)
{
return input.getInterval().overlaps(first.getInterval()) || input.getInterval()
.overlaps(second.getInterval());
.overlaps(second.getInterval());
}
}
)
@ -117,7 +117,7 @@ public class TimeBoundaryQueryQueryToolChest
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
{
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4(query.getType())
.setUser6("false")
.setUser10(query.getId());
@ -146,9 +146,9 @@ public class TimeBoundaryQueryQueryToolChest
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
return ByteBuffer.allocate(2)
.put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey())
.array();
.put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey())
.array();
}
@Override

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableList;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.Result;
@ -48,7 +49,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
@JsonCreator
public TimeseriesQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@ -132,14 +133,43 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
public String toString()
{
return "TimeseriesQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() +
'}';
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
TimeseriesQuery that = (TimeseriesQuery) o;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null)
return false;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null)
return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
return result;
}
}

View File

@ -123,7 +123,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4("timeseries")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))

View File

@ -101,4 +101,23 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec
{
delegate.initTopNAlgorithmSelector(selector);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o;
if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) return false;
return true;
}
@Override
public int hashCode()
{
return delegate != null ? delegate.hashCode() : 0;
}
}

View File

@ -118,4 +118,23 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec
"previousStop='" + previousStop + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LexicographicTopNMetricSpec that = (LexicographicTopNMetricSpec) o;
if (previousStop != null ? !previousStop.equals(that.previousStop) : that.previousStop != null) return false;
return true;
}
@Override
public int hashCode()
{
return previousStop != null ? previousStop.hashCode() : 0;
}
}

View File

@ -157,4 +157,23 @@ public class NumericTopNMetricSpec implements TopNMetricSpec
"metric='" + metric + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NumericTopNMetricSpec that = (NumericTopNMetricSpec) o;
if (metric != null ? !metric.equals(that.metric) : that.metric != null) return false;
return true;
}
@Override
public int hashCode()
{
return metric != null ? metric.hashCode() : 0;
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
@ -52,7 +53,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
@JsonCreator
public TopNQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("dimension") DimensionSpec dimensionSpec,
@JsonProperty("metric") TopNMetricSpec topNMetricSpec,
@JsonProperty("threshold") int threshold,
@ -208,4 +209,42 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
", postAggregatorSpecs=" + postAggregatorSpecs +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
TopNQuery topNQuery = (TopNQuery) o;
if (threshold != topNQuery.threshold) return false;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs) : topNQuery.aggregatorSpecs != null)
return false;
if (dimFilter != null ? !dimFilter.equals(topNQuery.dimFilter) : topNQuery.dimFilter != null) return false;
if (dimensionSpec != null ? !dimensionSpec.equals(topNQuery.dimensionSpec) : topNQuery.dimensionSpec != null)
return false;
if (granularity != null ? !granularity.equals(topNQuery.granularity) : topNQuery.granularity != null) return false;
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs) : topNQuery.postAggregatorSpecs != null)
return false;
if (topNMetricSpec != null ? !topNMetricSpec.equals(topNQuery.topNMetricSpec) : topNQuery.topNMetricSpec != null)
return false;
return true;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (dimensionSpec != null ? dimensionSpec.hashCode() : 0);
result = 31 * result + (topNMetricSpec != null ? topNMetricSpec.hashCode() : 0);
result = 31 * result + threshold;
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
return result;
}
}

View File

@ -21,6 +21,8 @@ package io.druid.query.topn;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.DataSource;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
@ -58,7 +60,7 @@ import java.util.Map;
*/
public class TopNQueryBuilder
{
private String dataSource;
private DataSource dataSource;
private DimensionSpec dimensionSpec;
private TopNMetricSpec topNMetricSpec;
private int threshold;
@ -71,7 +73,7 @@ public class TopNQueryBuilder
public TopNQueryBuilder()
{
dataSource = "";
dataSource = null;
dimensionSpec = null;
topNMetricSpec = null;
threshold = 0;
@ -83,7 +85,7 @@ public class TopNQueryBuilder
context = null;
}
public String getDataSource()
public DataSource getDataSource()
{
return dataSource;
}
@ -152,7 +154,7 @@ public class TopNQueryBuilder
public TopNQueryBuilder copy(TopNQuery query)
{
return new TopNQueryBuilder()
.dataSource(query.getDataSource())
.dataSource(query.getDataSource().toString())
.dimension(query.getDimensionSpec())
.metric(query.getTopNMetricSpec())
.threshold(query.getThreshold())
@ -180,6 +182,12 @@ public class TopNQueryBuilder
}
public TopNQueryBuilder dataSource(String d)
{
dataSource = new TableDataSource(d);
return this;
}
public TopNQueryBuilder dataSource(DataSource d)
{
dataSource = d;
return this;

View File

@ -128,7 +128,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4(String.format("topN/%s/%s", query.getThreshold(), query.getDimensionSpec().getDimension()))
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))

View File

@ -26,6 +26,7 @@ import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.data.Indexed;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.mozilla.javascript.Context;
import org.mozilla.javascript.Function;
@ -49,27 +50,33 @@ public class JavaScriptFilter implements Filter
{
final Context cx = Context.enter();
try {
ImmutableConciseSet conciseSet = ImmutableConciseSet.union(
FunctionalIterable.create(selector.getDimensionValues(dimension))
.filter(new Predicate<String>()
{
@Override
public boolean apply(@Nullable String input)
{
return predicate.applyInContext(cx, input);
}
})
.transform(
new com.google.common.base.Function<String, ImmutableConciseSet>()
{
@Override
public ImmutableConciseSet apply(@Nullable String input)
{
return selector.getConciseInvertedIndex(dimension, input);
}
}
)
);
final Indexed<String> dimValues = selector.getDimensionValues(dimension);
ImmutableConciseSet conciseSet;
if (dimValues == null) {
conciseSet = new ImmutableConciseSet();
} else {
conciseSet = ImmutableConciseSet.union(
FunctionalIterable.create(dimValues)
.filter(new Predicate<String>()
{
@Override
public boolean apply(@Nullable String input)
{
return predicate.applyInContext(cx, input);
}
})
.transform(
new com.google.common.base.Function<String, ImmutableConciseSet>()
{
@Override
public ImmutableConciseSet apply(@Nullable String input)
{
return selector.getConciseInvertedIndex(dimension, input);
}
}
)
);
}
return conciseSet;
} finally {
Context.exit();
@ -83,12 +90,14 @@ public class JavaScriptFilter implements Filter
return factory.makeValueMatcher(dimension, predicate);
}
static class JavaScriptPredicate implements Predicate<String> {
static class JavaScriptPredicate implements Predicate<String>
{
final ScriptableObject scope;
final Function fnApply;
final String script;
public JavaScriptPredicate(final String script) {
public JavaScriptPredicate(final String script)
{
Preconditions.checkNotNull(script, "script must not be null");
this.script = script;

View File

@ -126,8 +126,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public Iterable<Cursor> makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran)
{
if (index.isEmpty()) {
return ImmutableList.of();
}
Interval actualIntervalTmp = interval;
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (!actualIntervalTmp.overlaps(dataInterval)) {
return ImmutableList.of();

View File

@ -0,0 +1,88 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
public class DataSourceTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testSerialization() throws IOException
{
DataSource dataSource = new TableDataSource("somedatasource");
String json = jsonMapper.writeValueAsString(dataSource);
DataSource serdeDataSource = jsonMapper.readValue(json, DataSource.class);
Assert.assertEquals(dataSource, serdeDataSource);
}
@Test
public void testLegacyDataSource() throws IOException
{
DataSource dataSource = jsonMapper.readValue("\"somedatasource\"", DataSource.class);
Assert.assertEquals(new TableDataSource("somedatasource"), dataSource);
}
@Test
public void testTableDataSource() throws IOException
{
DataSource dataSource = jsonMapper.readValue("{\"type\":\"table\", \"name\":\"somedatasource\"}", DataSource.class);
Assert.assertEquals(new TableDataSource("somedatasource"), dataSource);
}
@Test
public void testQueryDataSource() throws IOException
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
String dataSourceJSON = "{\"type\":\"query\", \"query\":" + jsonMapper.writeValueAsString(query) + "}";
DataSource dataSource = jsonMapper.readValue(dataSourceJSON, DataSource.class);
Assert.assertEquals(new QueryDataSource(query), dataSource);
}
}

View File

@ -111,9 +111,15 @@ public class QueryRunnerTestHelper
public static final QuerySegmentSpec firstToThird = new MultipleIntervalSegmentSpec(
Arrays.asList(new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"))
);
public static final QuerySegmentSpec secondOnly = new MultipleIntervalSegmentSpec(
Arrays.asList(new Interval("2011-04-02T00:00:00.000Z/P1D"))
);
public static final QuerySegmentSpec fullOnInterval = new MultipleIntervalSegmentSpec(
Arrays.asList(new Interval("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"))
);
public static final QuerySegmentSpec emptyInterval = new MultipleIntervalSegmentSpec(
Arrays.asList(new Interval("2020-04-02T00:00:00.000Z/P1D"))
);
@SuppressWarnings("unchecked")
public static Collection<?> makeQueryRunners(

View File

@ -38,11 +38,14 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.JavaScriptDimFilter;
import io.druid.query.filter.RegexDimFilter;
import io.druid.query.groupby.having.EqualToHavingSpec;
import io.druid.query.groupby.having.GreaterThanHavingSpec;
@ -56,7 +59,9 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -90,22 +95,24 @@ public class GroupByQueryRunnerTest
config.setMaxIntermediateRows(10000);
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
new GroupByQueryEngine(
configSupplier,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
),
final GroupByQueryEngine engine = new GroupByQueryEngine(
configSupplier,
new GroupByQueryQueryToolChest(configSupplier)
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
);
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine)
);
return Lists.newArrayList(
@ -167,8 +174,7 @@ public class GroupByQueryRunnerTest
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
Iterable<Row> results = Sequences.toList(runner.run(query), Lists.<Row>newArrayList());
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@ -178,33 +184,33 @@ public class GroupByQueryRunnerTest
DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
GroupByQuery query = GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00")
.setDimensions(
Lists.newArrayList(
(DimensionSpec) new DefaultDimensionSpec(
"quality",
"alias"
)
)
)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.setGranularity(
new PeriodGranularity(
new Period("P1D"),
null,
tz
)
)
.build();
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00")
.setDimensions(
Lists.newArrayList(
(DimensionSpec) new DefaultDimensionSpec(
"quality",
"alias"
)
)
)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.setGranularity(
new PeriodGranularity(
new Period("P1D"),
null,
tz
)
)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L),
@ -228,11 +234,7 @@ public class GroupByQueryRunnerTest
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L)
);
Iterable<Row> results = Sequences.toList(
runner.run(query),
Lists.<Row>newArrayList()
);
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@ -661,7 +663,21 @@ public class GroupByQueryRunnerTest
createExpectedRow("2011-04-01", "quality", "automotive", "rows", 2L)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner);
final GroupByQueryEngine engine = new GroupByQueryEngine(
configSupplier,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
}
@ -696,7 +712,21 @@ public class GroupByQueryRunnerTest
);
TestHelper.assertExpectedObjects(expectedResults, runner.run(query), "normal");
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner);
final GroupByQueryEngine engine = new GroupByQueryEngine(
configSupplier,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
}
@ -731,10 +761,200 @@ public class GroupByQueryRunnerTest
);
TestHelper.assertExpectedObjects(expectedResults, runner.run(query), "normal");
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier).mergeResults(runner);
final GroupByQueryEngine engine = new GroupByQueryEngine(
configSupplier,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
}
// A subquery identical to the query should yield identical results
@Test
public void testIdenticalSubquery()
{
GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new LongSumAggregatorFactory("rows", "rows"),
new LongSumAggregatorFactory("idx", "idx")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
// Subqueries are handled by the ToolChest
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testDifferentGroupingSubquery()
{
GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new MaxAggregatorFactory("idx", "idx")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "idx", 2900.0),
createExpectedRow("2011-04-02", "idx", 2505.0)
);
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testDifferentIntervalSubquery()
{
GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.secondOnly)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new MaxAggregatorFactory("idx", "idx")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-02", "idx", 2505.0)
);
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testEmptySubquery()
{
GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.emptyInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new MaxAggregatorFactory("idx", "idx")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
Iterable<Row> results = runQuery(query);
Assert.assertFalse(results.iterator().hasNext());
}
private Iterable<Row> runQuery(GroupByQuery query)
{
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
Sequence<Row> queryResult = toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)).run(query);
return Sequences.toList(queryResult, Lists.<Row>newArrayList());
}
private Row createExpectedRow(final String timestamp, Object... vals)
{
return createExpectedRow(new DateTime(timestamp), vals);

View File

@ -0,0 +1,68 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query.groupby;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
public class GroupByQueryTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testQuerySerialization() throws IOException
{
Query query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
String json = jsonMapper.writeValueAsString(query);
Query serdeQuery = jsonMapper.readValue(json, Query.class);
Assert.assertEquals(query, serdeQuery);
}
}

View File

@ -56,22 +56,24 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
config.setMaxIntermediateRows(10000);
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
new GroupByQueryEngine(
configSupplier,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
),
final GroupByQueryEngine engine = new GroupByQueryEngine(
configSupplier,
new GroupByQueryQueryToolChest(configSupplier)
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
);
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine)
);
final Collection<?> objects = QueryRunnerTestHelper.makeQueryRunners(factory);
@ -95,13 +97,13 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
return Sequences.map(
groupByRunner.run(
GroupByQuery.builder()
.setDataSource(tsQuery.getDataSource())
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
.setGranularity(tsQuery.getGranularity())
.setDimFilter(tsQuery.getDimensionsFilter())
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.build()
.setDataSource(tsQuery.getDataSource())
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
.setGranularity(tsQuery.getGranularity())
.setDimFilter(tsQuery.getDimensionsFilter())
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.build()
),
new Function<Row, Result<TimeseriesResultValue>>()
{

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query.search;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunnerTestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class SearchQueryTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testQuerySerialization() throws IOException
{
Query query = Druids.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.query("a")
.build();
String json = jsonMapper.writeValueAsString(query);
Query serdeQuery = jsonMapper.readValue(json, Query.class);
Assert.assertEquals(query, serdeQuery);
}
}

View File

@ -28,6 +28,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.spec.LegacySegmentSpec;
import org.joda.time.DateTime;
@ -72,7 +73,7 @@ public class SelectQueryRunnerTest
public void testFullOnSelect()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
new TableDataSource(QueryRunnerTestHelper.dataSource),
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
@ -141,7 +142,7 @@ public class SelectQueryRunnerTest
public void testSelectWithDimsAndMets()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
new TableDataSource(QueryRunnerTestHelper.dataSource),
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
@ -201,7 +202,7 @@ public class SelectQueryRunnerTest
public void testSelectPagination()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
new TableDataSource(QueryRunnerTestHelper.dataSource),
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
@ -261,7 +262,7 @@ public class SelectQueryRunnerTest
public void testFullOnSelectWithFilter()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
new TableDataSource(QueryRunnerTestHelper.dataSource),
new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")),
new SelectorDimFilter(QueryRunnerTestHelper.providerDimension, "spot"),
QueryRunnerTestHelper.dayGran,

View File

@ -0,0 +1,50 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query.timeboundary;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class TimeBoundaryQueryTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testQuerySerialization() throws IOException
{
Query query = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.build();
String json = jsonMapper.writeValueAsString(query);
Query serdeQuery = jsonMapper.readValue(json, Query.class);
Assert.assertEquals(query, serdeQuery);
}
}

View File

@ -0,0 +1,62 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query.timeseries;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.PostAggregator;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
public class TimeseriesQueryTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testQuerySerialization() throws IOException
{
Query query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.dayGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexDoubleSum
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
String json = jsonMapper.writeValueAsString(query);
Query serdeQuery = jsonMapper.readValue(json, Query.class);
Assert.assertEquals(query, serdeQuery);
}
}

View File

@ -133,32 +133,32 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("provider", "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
.put(providerDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
ImmutableMap.<String, Object>builder()
.put("provider", "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build(),
.put(providerDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build(),
ImmutableMap.<String, Object>builder()
.put("provider", "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build()
.put(providerDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build()
)
)
)
@ -197,32 +197,32 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("provider", "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
.put(providerDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
ImmutableMap.<String, Object>builder()
.put("provider", "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build(),
.put(providerDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build(),
ImmutableMap.<String, Object>builder()
.put("provider", "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build()
.put(providerDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build()
)
)
)
@ -262,32 +262,32 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("provider", "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build(),
.put("provider", "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build(),
ImmutableMap.<String, Object>builder()
.put("provider", "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
.put("provider", "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
ImmutableMap.<String, Object>builder()
.put("provider", "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build()
.put("provider", "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build()
)
)
)
@ -318,21 +318,21 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "spot",
providerDimension, "spot",
"rows", 18L,
"index", 2231.8768157958984D,
"addRowsIndexConstant", 2250.8768157958984D,
@ -416,21 +416,21 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "spot",
providerDimension, "spot",
"rows", 18L,
"index", 2231.8768157958984D,
"addRowsIndexConstant", 2250.8768157958984D,
@ -465,14 +465,14 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
@ -507,7 +507,7 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
@ -542,21 +542,21 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 2L,
"index", 2591.68359375D,
"addRowsIndexConstant", 2594.68359375D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 2L,
"index", 2508.39599609375D,
"addRowsIndexConstant", 2511.39599609375D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.<String, Object>of(
"provider", "spot",
providerDimension, "spot",
"rows", 2L,
"index", 220.63774871826172D,
"addRowsIndexConstant", 223.63774871826172D,
@ -595,21 +595,21 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 1L,
"index", new Float(1447.341160).doubleValue(),
"addRowsIndexConstant", new Float(1449.341160).doubleValue(),
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 1L,
"index", new Float(1314.839715).doubleValue(),
"addRowsIndexConstant", new Float(1316.839715).doubleValue(),
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.<String, Object>of(
"provider", "spot",
providerDimension, "spot",
"rows", 1L,
"index", new Float(109.705815).doubleValue(),
"addRowsIndexConstant", new Float(111.705815).doubleValue(),
@ -644,14 +644,14 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
@ -695,18 +695,18 @@ public class TopNQueryRunnerTest
public void testTopNWithNonExistentFilterMultiDim()
{
AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder()
.fields(
Lists.<DimFilter>newArrayList(
Druids.newSelectorDimFilterBuilder()
.dimension(providerDimension)
.value("billyblank")
.build(),
Druids.newSelectorDimFilterBuilder()
.dimension(QueryRunnerTestHelper.qualityDimension)
.value("mezzanine")
.build()
)
).build();
.fields(
Lists.<DimFilter>newArrayList(
Druids.newSelectorDimFilterBuilder()
.dimension(providerDimension)
.value("billyblank")
.build(),
Druids.newSelectorDimFilterBuilder()
.dimension(QueryRunnerTestHelper.qualityDimension)
.value("mezzanine")
.build()
)
).build();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
@ -966,21 +966,21 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "spot",
providerDimension, "spot",
"rows", 18L,
"index", 2231.8768157958984D,
"addRowsIndexConstant", 2250.8768157958984D,
"uniques", QueryRunnerTestHelper.UNIQUES_9
),
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
@ -1014,14 +1014,14 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
@ -1055,14 +1055,14 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
@ -1100,21 +1100,21 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "s",
providerDimension, "s",
"rows", 18L,
"index", 2231.8768157958984D,
"addRowsIndexConstant", 2250.8768157958984D,
"uniques", QueryRunnerTestHelper.UNIQUES_9
),
ImmutableMap.<String, Object>of(
"provider", "t",
providerDimension, "t",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "u",
providerDimension, "u",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
@ -1149,21 +1149,21 @@ public class TopNQueryRunnerTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"provider", "spot",
providerDimension, "spot",
"rows", 18L,
"index", 2231.8768157958984D,
"addRowsIndexConstant", 2250.8768157958984D,
"uniques", QueryRunnerTestHelper.UNIQUES_9
),
ImmutableMap.<String, Object>of(
"provider", "upfront",
providerDimension, "upfront",
"rows", 4L,
"index", 4875.669677734375D,
"addRowsIndexConstant", 4880.669677734375D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
),
ImmutableMap.<String, Object>of(
"provider", "total_market",
providerDimension, "total_market",
"rows", 4L,
"index", 5351.814697265625D,
"addRowsIndexConstant", 5356.814697265625D,

View File

@ -0,0 +1,81 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.aggregation.MinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import static io.druid.query.QueryRunnerTestHelper.addRowsIndexConstant;
import static io.druid.query.QueryRunnerTestHelper.allGran;
import static io.druid.query.QueryRunnerTestHelper.commonAggregators;
import static io.druid.query.QueryRunnerTestHelper.dataSource;
import static io.druid.query.QueryRunnerTestHelper.fullOnInterval;
import static io.druid.query.QueryRunnerTestHelper.indexMetric;
import static io.druid.query.QueryRunnerTestHelper.providerDimension;
public class TopNQueryTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testQuerySerialization() throws IOException
{
Query query = new TopNQueryBuilder()
.dataSource(dataSource)
.granularity(allGran)
.dimension(providerDimension)
.metric(indexMetric)
.threshold(4)
.intervals(fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
commonAggregators,
Lists.newArrayList(
new MaxAggregatorFactory("maxIndex", "index"),
new MinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
.build();
String json = jsonMapper.writeValueAsString(query);
Query serdeQuery = jsonMapper.readValue(json, Query.class);
Assert.assertEquals(query, serdeQuery);
}
}

View File

@ -25,13 +25,16 @@ import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client;
import io.druid.query.DataSource;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.TableDataSource;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
@ -232,10 +235,21 @@ public class BrokerServerView implements TimelineServerView
@Override
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(String dataSource)
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource)
{
String table;
while (dataSource instanceof QueryDataSource) {
dataSource = ((QueryDataSource) dataSource).getQuery().getDataSource();
}
if (dataSource instanceof TableDataSource) {
table = ((TableDataSource) dataSource).getName();
} else {
throw new UnsupportedOperationException("Unsupported data source type: " + dataSource.getClass().getSimpleName());
}
synchronized (lock) {
return timelines.get(dataSource);
return timelines.get(table);
}
}

View File

@ -40,8 +40,8 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.cache.Cache;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;
@ -124,7 +124,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null;
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null;
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
@ -140,6 +140,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
VersionedIntervalTimeline<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
if (timeline == null) {
return Sequences.empty();
@ -285,8 +286,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
objectMapper.getFactory().createParser(cachedResult),
cacheObjectClazz
);
}
catch (IOException e) {
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
@ -339,7 +339,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", segmentIdentifier, value.getInterval())
);
if(cachePopulator != null) {
if (cachePopulator != null) {
cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache));
}
@ -424,8 +424,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
cache.put(key, valueBytes);
}
catch (IOException e) {
} catch (IOException e) {
throw Throwables.propagate(e);
}
}

View File

@ -20,6 +20,7 @@
package io.druid.client;
import io.druid.client.selector.ServerSelector;
import io.druid.query.DataSource;
import io.druid.query.QueryRunner;
import io.druid.timeline.VersionedIntervalTimeline;
@ -27,6 +28,6 @@ import io.druid.timeline.VersionedIntervalTimeline;
*/
public interface TimelineServerView extends ServerView
{
VersionedIntervalTimeline<String, ServerSelector> getTimeline(String dataSource);
VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource);
<T> QueryRunner<T> getQueryRunner(DruidServer server);
}

View File

@ -23,15 +23,17 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import io.druid.segment.loading.LocalDataSegmentKiller;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoader;
/**
*/
public class DataSegmentPusherPullerModule implements Module
public class LocalDataStorageDruidModule implements Module
{
@Override
public void configure(Binder binder)
@ -52,6 +54,11 @@ public class DataSegmentPusherPullerModule implements Module
.to(LocalDataSegmentPuller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentKiller.class))
.addBinding("local")
.to(LocalDataSegmentKiller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("local")
.to(LocalDataSegmentPusher.class)

View File

@ -36,7 +36,7 @@ import io.druid.curator.CuratorModule;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DataSegmentPusherPullerModule;
import io.druid.guice.LocalDataStorageDruidModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.DruidProcessingModule;
@ -316,7 +316,7 @@ public class Initialization
new DbConnectorModule(),
new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(),
new DataSegmentPusherPullerModule(),
new LocalDataStorageDruidModule(),
new FirehoseModule()
);

View File

@ -0,0 +1,68 @@
package io.druid.segment.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.util.Map;
/**
*/
public class LocalDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(LocalDataSegmentKiller.class);
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
final File path = getDirectory(segment);
log.info("segment[%s] maps to path[%s]", segment.getIdentifier(), path);
if (!path.isDirectory()) {
if (!path.delete()) {
log.error("Unable to delete file[%s].", path);
throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier());
}
return;
}
final File[] files = path.listFiles();
int success = 0;
for (File file : files) {
if (!file.delete()) {
log.error("Unable to delete file[%s].", file);
} else {
++success;
}
}
if (success == 0 && files.length != 0) {
throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier());
}
if (success < files.length) {
log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier());
} else if (!path.delete()) {
log.warn("Unable to delete directory[%s].", path);
log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier());
}
}
private File getDirectory(DataSegment segment) throws SegmentLoadingException
{
final Map<String, Object> loadSpec = segment.getLoadSpec();
final File path = new File(MapUtils.getString(loadSpec, "path"));
if (!path.exists()) {
throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path);
}
return path.getParentFile();
}
}

View File

@ -30,6 +30,7 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.query.DataSource;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
@ -39,6 +40,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import org.joda.time.DateTime;
@ -96,6 +98,7 @@ public class RealtimeManager implements QuerySegmentWalker
Closeables.closeQuietly(chief);
}
}
public FireDepartmentMetrics getMetrics(String datasource)
{
FireChief chief = chiefs.get(datasource);
@ -108,7 +111,7 @@ public class RealtimeManager implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
final FireChief chief = chiefs.get(query.getDataSource());
final FireChief chief = chiefs.get(getDataSourceName(query));
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
}
@ -116,11 +119,28 @@ public class RealtimeManager implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
final FireChief chief = chiefs.get(query.getDataSource());
final FireChief chief = chiefs.get(getDataSourceName(query));
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
}
private <T> String getDataSourceName(Query<T> query)
{
DataSource dataSource = query.getDataSource();
if (!(dataSource instanceof TableDataSource)) {
throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported");
}
String dataSourceName;
try {
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
}
return dataSourceName;
}
private class FireChief extends Thread implements Closeable
{
private final FireDepartment fireDepartment;
@ -152,8 +172,7 @@ public class RealtimeManager implements QuerySegmentWalker
log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber();
log.info("We have our plumber!");
}
catch (IOException e) {
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
@ -180,8 +199,7 @@ public class RealtimeManager implements QuerySegmentWalker
try {
try {
inputRow = firehose.nextRow();
}
catch (Exception e) {
} catch (Exception e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;
@ -206,8 +224,7 @@ public class RealtimeManager implements QuerySegmentWalker
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
metrics.incrementProcessed();
}
catch (FormattedException e) {
} catch (FormattedException e) {
log.info(e, "unparseable line: %s", e.getDetails());
metrics.incrementUnparseable();
continue;
@ -215,16 +232,15 @@ public class RealtimeManager implements QuerySegmentWalker
}
} catch (RuntimeException e) {
log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())
.emit();
.emit();
normalExit = false;
throw e;
} catch (Error e) {
log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())
.emit();
.emit();
normalExit = false;
throw e;
}
finally {
} finally {
Closeables.closeQuietly(firehose);
if (normalExit) {
plumber.finishJob();

View File

@ -131,7 +131,7 @@ public class QueryResource
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser2(query.getDataSource().toString())
.setUser4(query.getType())
.setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters()))

View File

@ -31,6 +31,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.CountingMap;
import io.druid.guice.annotations.Processing;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.DataSource;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
@ -42,6 +43,7 @@ import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.ReferenceCountingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
@ -118,6 +120,7 @@ public class ServerManager implements QuerySegmentWalker
/**
* Load a single segment.
*
* @param segment segment to load
* @return true if the segment was newly loaded, false if it was already loaded
* @throws SegmentLoadingException if the segment cannot be loaded
@ -127,12 +130,10 @@ public class ServerManager implements QuerySegmentWalker
final Segment adapter;
try {
adapter = segmentLoader.getSegment(segment);
}
catch (SegmentLoadingException e) {
} catch (SegmentLoadingException e) {
try {
segmentLoader.cleanup(segment);
}
catch (SegmentLoadingException e1) {
} catch (SegmentLoadingException e1) {
// ignore
}
throw e;
@ -204,12 +205,11 @@ public class ServerManager implements QuerySegmentWalker
try {
log.info("Attempting to close segment %s", segment.getIdentifier());
oldQueryable.close();
}
catch (IOException e) {
} catch (IOException e) {
log.makeAlert(e, "Exception closing segment")
.addData("dataSource", dataSource)
.addData("segmentId", segment.getIdentifier())
.emit();
.addData("dataSource", dataSource)
.addData("segmentId", segment.getIdentifier())
.emit();
}
} else {
log.info(
@ -233,7 +233,19 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(query.getDataSource());
DataSource dataSource = query.getDataSource();
if (!(dataSource instanceof TableDataSource)) {
throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported");
}
String dataSourceName;
try {
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
}
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName);
if (timeline == null) {
return new NoopQueryRunner<T>();
@ -294,6 +306,7 @@ public class ServerManager implements QuerySegmentWalker
Predicates.<QueryRunner<T>>notNull()
);
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);
}
@ -303,14 +316,21 @@ public class ServerManager implements QuerySegmentWalker
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
log.makeAlert("Unknown query type, [%s]", query.getClass())
.addData("dataSource", query.getDataSource())
.emit();
.addData("dataSource", query.getDataSource())
.emit();
return new NoopQueryRunner<T>();
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(query.getDataSource());
String dataSourceName;
try {
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
}
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName);
if (timeline == null) {
return new NoopQueryRunner<T>();

View File

@ -35,11 +35,6 @@ import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.TrinaryFn;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNResultValue;
import io.druid.client.cache.Cache;
import io.druid.client.cache.MapCache;
import io.druid.client.selector.QueryableDruidServer;
@ -49,6 +44,7 @@ import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
@ -57,6 +53,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -76,6 +73,11 @@ import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.TestHelper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
@ -211,13 +213,13 @@ public class CachingClusteredClientTest
public void testTimeseriesCaching() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
testQueryCaching(
builder.build(),
@ -262,9 +264,9 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -274,13 +276,13 @@ public class CachingClusteredClientTest
public void testTimeseriesCachingTimeZone() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(PT1H_TZ_GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(PT1H_TZ_GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
testQueryCaching(
builder.build(),
@ -302,9 +304,9 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -313,18 +315,18 @@ public class CachingClusteredClientTest
public void testDisableUseCache() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS);
testQueryCaching(
1,
true,
builder.context(ImmutableMap.of("useCache", "false",
"populateCache", "true")).build(),
"populateCache", "true")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
@ -338,7 +340,7 @@ public class CachingClusteredClientTest
1,
false,
builder.context(ImmutableMap.of("useCache", "false",
"populateCache", "false")).build(),
"populateCache", "false")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
@ -350,7 +352,7 @@ public class CachingClusteredClientTest
1,
false,
builder.context(ImmutableMap.of("useCache", "true",
"populateCache", "false")).build(),
"populateCache", "false")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
@ -419,10 +421,10 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -464,10 +466,10 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-11-04/2011-11-08")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -530,10 +532,10 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
@ -543,7 +545,7 @@ public class CachingClusteredClientTest
{
testQueryCaching(
new SearchQuery(
DATA_SOURCE,
new TableDataSource(DATA_SOURCE),
DIM_FILTER,
GRANULARITY,
1000,
@ -579,7 +581,8 @@ public class CachingClusteredClientTest
);
}
public void testQueryCaching(final Query query, Object... args) {
public void testQueryCaching(final Query query, Object... args)
{
testQueryCaching(3, true, query, args);
}
@ -634,8 +637,8 @@ public class CachingClusteredClientTest
EasyMock.expect(serverView.getQueryRunner(server))
.andReturn(expectations.getQueryRunner())
.once();
.andReturn(expectations.getQueryRunner())
.once();
final Capture<? extends Query> capture = new Capture();
queryCaptures.add(capture);
@ -652,8 +655,8 @@ public class CachingClusteredClientTest
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once();
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once();
} else if (query instanceof TopNQuery) {
List<String> segmentIds = Lists.newArrayList();
@ -665,8 +668,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTopNResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableTopNResults(segmentIds, intervals, results))
.once();
} else if (query instanceof SearchQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
@ -677,8 +680,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once();
} else if (query instanceof TimeBoundaryQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
@ -689,8 +692,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
.once();
} else {
throw new ISE("Unknown query type[%s]", query.getClass());
}
@ -757,13 +760,12 @@ public class CachingClusteredClientTest
// make sure all the queries were sent down as 'bySegment'
for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue();
if(expectBySegment) {
if (expectBySegment) {
Assert.assertEquals("true", capturedQuery.getContextValue("bySegment"));
}
else {
} else {
Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null ||
capturedQuery.getContextValue("bySegment").equals("false")
capturedQuery.getContextValue("bySegment").equals("false")
);
}
}
@ -818,7 +820,8 @@ public class CachingClusteredClientTest
}
timeline.add(queryIntervals.get(k), String.valueOf(k), chunk);
}
} return serverExpectationList;
}
return serverExpectationList;
}
private Sequence<Result<TimeseriesResultValue>> toQueryableTimeseriesResults(
@ -828,35 +831,35 @@ public class CachingClusteredClientTest
Iterable<Iterable<Result<TimeseriesResultValue>>> results
)
{
if(bySegment) {
return Sequences.simple(
FunctionalIterable
.create(segmentIds)
.trinaryTransform(
intervals,
results,
new TrinaryFn<String, Interval, Iterable<Result<TimeseriesResultValue>>, Result<TimeseriesResultValue>>()
{
@Override
@SuppressWarnings("unchecked")
public Result<TimeseriesResultValue> apply(
final String segmentId,
final Interval interval,
final Iterable<Result<TimeseriesResultValue>> results
)
if (bySegment) {
return Sequences.simple(
FunctionalIterable
.create(segmentIds)
.trinaryTransform(
intervals,
results,
new TrinaryFn<String, Interval, Iterable<Result<TimeseriesResultValue>>, Result<TimeseriesResultValue>>()
{
return new Result(
results.iterator().next().getTimestamp(),
new BySegmentResultValueClass(
Lists.newArrayList(results),
segmentId,
interval
)
);
@Override
@SuppressWarnings("unchecked")
public Result<TimeseriesResultValue> apply(
final String segmentId,
final Interval interval,
final Iterable<Result<TimeseriesResultValue>> results
)
{
return new Result(
results.iterator().next().getTimestamp(),
new BySegmentResultValueClass(
Lists.newArrayList(results),
segmentId,
interval
)
);
}
}
}
)
);
)
);
} else {
return Sequences.simple(Iterables.concat(results));
}
@ -994,36 +997,36 @@ public class CachingClusteredClientTest
}
private Iterable<BySegmentResultValueClass<TimeseriesResultValue>> makeBySegmentTimeResults
(Object... objects)
{
if (objects.length % 5 != 0) {
throw new ISE("makeTimeResults must be passed arguments in groups of 5, got[%d]", objects.length);
}
List<BySegmentResultValueClass<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 5);
for (int i = 0; i < objects.length; i += 5) {
retVal.add(
new BySegmentResultValueClass<TimeseriesResultValue>(
Lists.newArrayList(
new TimeseriesResultValue(
ImmutableMap.of(
"rows", objects[i + 1],
"imps", objects[i + 2],
"impers", objects[i + 2],
"avg_imps_per_row",
((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue()
)
)
),
(String)objects[i+3],
(Interval)objects[i+4]
)
);
}
return retVal;
(Object... objects)
{
if (objects.length % 5 != 0) {
throw new ISE("makeTimeResults must be passed arguments in groups of 5, got[%d]", objects.length);
}
List<BySegmentResultValueClass<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 5);
for (int i = 0; i < objects.length; i += 5) {
retVal.add(
new BySegmentResultValueClass<TimeseriesResultValue>(
Lists.newArrayList(
new TimeseriesResultValue(
ImmutableMap.of(
"rows", objects[i + 1],
"imps", objects[i + 2],
"impers", objects[i + 2],
"avg_imps_per_row",
((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue()
)
)
),
(String) objects[i + 3],
(Interval) objects[i + 4]
)
);
}
return retVal;
}
private Iterable<Result<TimeseriesResultValue>> makeRenamedTimeResults
(Object... objects)
{
@ -1156,13 +1159,13 @@ public class CachingClusteredClientTest
return new CachingClusteredClient(
new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig())
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.build()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig())
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.build()
),
new TimelineServerView()
{
@ -1172,7 +1175,7 @@ public class CachingClusteredClientTest
}
@Override
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(String dataSource)
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource)
{
return timeline;
}