finalize variables and optimize imports

This commit is contained in:
Yuval Oren 2014-03-06 15:03:43 -08:00
parent be1ef3a161
commit 5601c51ae6
27 changed files with 404 additions and 340 deletions

View File

@ -19,7 +19,6 @@
package io.druid.query; package io.druid.query;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;

View File

@ -23,25 +23,14 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.topn.TopNQuery;
public class QueryDataSource implements DataSource public class QueryDataSource implements DataSource
{ {
@JsonProperty @JsonProperty
private Query query; private final Query query;
public QueryDataSource() @JsonCreator
{ public QueryDataSource(@JsonProperty("query") Query query)
}
public QueryDataSource(Query query)
{ {
this.query = query; this.query = query;
} }
@ -51,11 +40,6 @@ public class QueryDataSource implements DataSource
return query; return query;
} }
public void setQuery(Query query)
{
this.query = query;
}
public String toString() { return query.toString(); } public String toString() { return query.toString(); }
@Override @Override

View File

@ -22,7 +22,6 @@
package io.druid.query; package io.druid.query;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import org.joda.time.Period;
/** /**
* If there's a subquery, run it instead of the outer query * If there's a subquery, run it instead of the outer query
@ -41,9 +40,8 @@ public class SubqueryQueryRunner<T> implements QueryRunner<T>
{ {
DataSource dataSource = query.getDataSource(); DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) { if (dataSource instanceof QueryDataSource) {
return run((Query<T>) ((QueryDataSource)dataSource).getQuery()); return run((Query<T>) ((QueryDataSource) dataSource).getQuery());
} } else {
else {
return baseRunner.run(query); return baseRunner.run(query);
} }
} }

View File

@ -26,18 +26,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class TableDataSource implements DataSource public class TableDataSource implements DataSource
{ {
@JsonProperty @JsonProperty
private String name; private final String name;
@JsonCreator @JsonCreator
public TableDataSource() public TableDataSource(@JsonProperty("name") String name)
{ {
this.name = name.toLowerCase();
}
@JsonCreator
public TableDataSource(String name)
{
this.name = name==null? name : name.toLowerCase();
} }
public String getName() public String getName()
@ -45,11 +39,6 @@ public class TableDataSource implements DataSource
return name; return name;
} }
public void setName(String name)
{
this.name = name;
}
public String toString() { return name; } public String toString() { return name; }
@Override @Override

View File

@ -32,7 +32,12 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.*; import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DefaultDimensionSpec;

View File

@ -24,9 +24,6 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;

View File

@ -34,7 +34,13 @@ import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.query.*; import io.druid.query.DataSource;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
@ -93,7 +99,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
if (dataSource instanceof QueryDataSource) { if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery; GroupByQuery subquery;
try { try {
subquery = (GroupByQuery) ((QueryDataSource)dataSource).getQuery(); subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery();
} catch (ClassCastException e) { } catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
} }
@ -101,8 +107,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
IncrementalIndexStorageAdapter adapter IncrementalIndexStorageAdapter adapter
= new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult)); = new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult));
result = engine.process(query, adapter); result = engine.process(query, adapter);
} } else {
else {
result = runner.run(query); result = runner.run(query);
} }

View File

@ -34,7 +34,11 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.query.*; import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis;
@ -175,9 +179,9 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{ {
byte[] includerBytes = query.getToInclude().getCacheKey(); byte[] includerBytes = query.getToInclude().getCacheKey();
return ByteBuffer.allocate(1 + includerBytes.length) return ByteBuffer.allocate(1 + includerBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX) .put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes) .put(includerBytes)
.array(); .array();
} }
@Override @Override

View File

@ -37,7 +37,14 @@ import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.query.*; import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.ResultGranularTimestampComparator;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.query.search.search.SearchHit; import io.druid.query.search.search.SearchHit;
@ -166,7 +173,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
final ByteBuffer queryCacheKey = ByteBuffer final ByteBuffer queryCacheKey = ByteBuffer
.allocate( .allocate(
1 + 4 + granularityBytes.length + filterBytes.length + 1 + 4 + granularityBytes.length + filterBytes.length +
querySpecBytes.length + dimensionsBytesSize querySpecBytes.length + dimensionsBytesSize
) )
.put(SEARCH_QUERY) .put(SEARCH_QUERY)
.put(Ints.toByteArray(query.getLimit())) .put(Ints.toByteArray(query.getLimit()))

View File

@ -25,7 +25,10 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.*; import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.query.search.SearchResultValue; import io.druid.query.search.SearchResultValue;
import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.QuerySegmentSpec;
@ -179,14 +182,14 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
public String toString() public String toString()
{ {
return "SearchQuery{" + return "SearchQuery{" +
"dataSource='" + getDataSource() + '\'' + "dataSource='" + getDataSource() + '\'' +
", dimFilter=" + dimFilter + ", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' + ", granularity='" + granularity + '\'' +
", dimensions=" + dimensions + ", dimensions=" + dimensions +
", querySpec=" + querySpec + ", querySpec=" + querySpec +
", querySegmentSpec=" + getQuerySegmentSpec() + ", querySegmentSpec=" + getQuerySegmentSpec() +
", limit=" + limit + ", limit=" + limit +
'}'; '}';
} }
@Override @Override

View File

@ -24,7 +24,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import io.druid.query.*; import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -31,7 +31,12 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.query.*; import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.timeline.LogicalSegment; import io.druid.timeline.LogicalSegment;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -73,7 +78,7 @@ public class TimeBoundaryQueryQueryToolChest
public boolean apply(T input) public boolean apply(T input)
{ {
return input.getInterval().overlaps(first.getInterval()) || input.getInterval() return input.getInterval().overlaps(first.getInterval()) || input.getInterval()
.overlaps(second.getInterval()); .overlaps(second.getInterval());
} }
} }
) )
@ -141,9 +146,9 @@ public class TimeBoundaryQueryQueryToolChest
public byte[] computeCacheKey(TimeBoundaryQuery query) public byte[] computeCacheKey(TimeBoundaryQuery query)
{ {
return ByteBuffer.allocate(2) return ByteBuffer.allocate(2)
.put(TIMEBOUNDARY_QUERY) .put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey()) .put(query.getCacheKey())
.array(); .array();
} }
@Override @Override

View File

@ -24,7 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.*; import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
@ -129,14 +133,14 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
public String toString() public String toString()
{ {
return "TimeseriesQuery{" + return "TimeseriesQuery{" +
"dataSource='" + getDataSource() + '\'' + "dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() + ", querySegmentSpec=" + getQuerySegmentSpec() +
", dimFilter=" + dimFilter + ", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' + ", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs + ", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs + ", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() + ", context=" + getContext() +
'}'; '}';
} }
@Override @Override

View File

@ -22,8 +22,6 @@ package io.druid.query.timeseries;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.QueryRunnerHelper; import io.druid.query.QueryRunnerHelper;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.Aggregator;

View File

@ -32,7 +32,16 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.*; import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.ResultGranularTimestampComparator;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;

View File

@ -22,7 +22,13 @@ package io.druid.query.timeseries;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.query.*; import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;

View File

@ -24,7 +24,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.*; import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;

View File

@ -31,7 +31,11 @@ import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator; import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.*; import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestIndex;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;

View File

@ -22,22 +22,14 @@
package io.druid.query.search; package io.druid.query.search;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.search.search.SearchQuery;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
public class SearchQueryTest public class SearchQueryTest
{ {

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunnerTestHelper;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -39,8 +38,8 @@ public class TimeBoundaryQueryTest
public void testQuerySerialization() throws IOException public void testQuerySerialization() throws IOException
{ {
Query query = Druids.newTimeBoundaryQueryBuilder() Query query = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing") .dataSource("testing")
.build(); .build();
String json = jsonMapper.writeValueAsString(query); String json = jsonMapper.writeValueAsString(query);
Query serdeQuery = jsonMapper.readValue(json, Query.class); Query serdeQuery = jsonMapper.readValue(json, Query.class);

View File

@ -54,8 +54,6 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static io.druid.query.QueryRunnerTestHelper.*;
/** /**
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@ -135,32 +133,32 @@ public class TopNQueryRunnerTest
new TopNResultValue( new TopNResultValue(
Arrays.<Map<String, Object>>asList( Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put(providerDimension, "total_market") .put(providerDimension, "total_market")
.put("rows", 186L) .put("rows", 186L)
.put("index", 215679.82879638672D) .put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D) .put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D) .put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D) .put("minIndex", 792.3260498046875D)
.build(), .build(),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put(providerDimension, "upfront") .put(providerDimension, "upfront")
.put("rows", 186L) .put("rows", 186L)
.put("index", 192046.1060180664D) .put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D) .put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D) .put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D) .put("minIndex", 545.9906005859375D)
.build(), .build(),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put(providerDimension, "spot") .put(providerDimension, "spot")
.put("rows", 837L) .put("rows", 837L)
.put("index", 95606.57232284546D) .put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D) .put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9) .put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D) .put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D) .put("minIndex", 59.02102279663086D)
.build() .build()
) )
) )
) )
@ -199,32 +197,32 @@ public class TopNQueryRunnerTest
new TopNResultValue( new TopNResultValue(
Arrays.<Map<String, Object>>asList( Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put(providerDimension, "total_market") .put(providerDimension, "total_market")
.put("rows", 186L) .put("rows", 186L)
.put("index", 215679.82879638672D) .put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D) .put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D) .put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D) .put("minIndex", 792.3260498046875D)
.build(), .build(),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put(providerDimension, "upfront") .put(providerDimension, "upfront")
.put("rows", 186L) .put("rows", 186L)
.put("index", 192046.1060180664D) .put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D) .put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D) .put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D) .put("minIndex", 545.9906005859375D)
.build(), .build(),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put(providerDimension, "spot") .put(providerDimension, "spot")
.put("rows", 837L) .put("rows", 837L)
.put("index", 95606.57232284546D) .put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D) .put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9) .put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D) .put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D) .put("minIndex", 59.02102279663086D)
.build() .build()
) )
) )
) )
@ -264,32 +262,32 @@ public class TopNQueryRunnerTest
new TopNResultValue( new TopNResultValue(
Arrays.<Map<String, Object>>asList( Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("provider", "spot") .put("provider", "spot")
.put("rows", 837L) .put("rows", 837L)
.put("index", 95606.57232284546D) .put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D) .put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9) .put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D) .put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D) .put("minIndex", 59.02102279663086D)
.build(), .build(),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("provider", "total_market") .put("provider", "total_market")
.put("rows", 186L) .put("rows", 186L)
.put("index", 215679.82879638672D) .put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D) .put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D) .put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D) .put("minIndex", 792.3260498046875D)
.build(), .build(),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("provider", "upfront") .put("provider", "upfront")
.put("rows", 186L) .put("rows", 186L)
.put("index", 192046.1060180664D) .put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D) .put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2) .put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D) .put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D) .put("minIndex", 545.9906005859375D)
.build() .build()
) )
) )
) )
@ -697,18 +695,18 @@ public class TopNQueryRunnerTest
public void testTopNWithNonExistentFilterMultiDim() public void testTopNWithNonExistentFilterMultiDim()
{ {
AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder() AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder()
.fields( .fields(
Lists.<DimFilter>newArrayList( Lists.<DimFilter>newArrayList(
Druids.newSelectorDimFilterBuilder() Druids.newSelectorDimFilterBuilder()
.dimension(providerDimension) .dimension(providerDimension)
.value("billyblank") .value("billyblank")
.build(), .build(),
Druids.newSelectorDimFilterBuilder() Druids.newSelectorDimFilterBuilder()
.dimension(QueryRunnerTestHelper.qualityDimension) .dimension(QueryRunnerTestHelper.qualityDimension)
.value("mezzanine") .value("mezzanine")
.build() .build()
) )
).build(); ).build();
TopNQuery query = new TopNQueryBuilder() TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource) .dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran) .granularity(QueryRunnerTestHelper.allGran)

View File

@ -36,7 +36,13 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import static io.druid.query.QueryRunnerTestHelper.*; import static io.druid.query.QueryRunnerTestHelper.addRowsIndexConstant;
import static io.druid.query.QueryRunnerTestHelper.allGran;
import static io.druid.query.QueryRunnerTestHelper.commonAggregators;
import static io.druid.query.QueryRunnerTestHelper.dataSource;
import static io.druid.query.QueryRunnerTestHelper.fullOnInterval;
import static io.druid.query.QueryRunnerTestHelper.indexMetric;
import static io.druid.query.QueryRunnerTestHelper.providerDimension;
public class TopNQueryTest public class TopNQueryTest
{ {

View File

@ -25,12 +25,16 @@ import com.google.common.collect.Ordering;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client; import io.druid.guice.annotations.Client;
import io.druid.query.*; import io.druid.query.DataSource;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.TableDataSource;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
@ -235,13 +239,12 @@ public class BrokerServerView implements TimelineServerView
{ {
String table; String table;
while (dataSource instanceof QueryDataSource) { while (dataSource instanceof QueryDataSource) {
dataSource = ((QueryDataSource)dataSource).getQuery().getDataSource(); dataSource = ((QueryDataSource) dataSource).getQuery().getDataSource();
} }
if (dataSource instanceof TableDataSource) { if (dataSource instanceof TableDataSource) {
table = ((TableDataSource)dataSource).getName(); table = ((TableDataSource) dataSource).getName();
} } else {
else {
throw new UnsupportedOperationException("Unsupported data source type: " + dataSource.getClass().getSimpleName()); throw new UnsupportedOperationException("Unsupported data source type: " + dataSource.getClass().getSimpleName());
} }

View File

@ -40,8 +40,8 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.client.cache.Cache; import io.druid.client.cache.Cache;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
@ -124,7 +124,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null; final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null;
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null; && strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
@ -286,8 +286,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
objectMapper.getFactory().createParser(cachedResult), objectMapper.getFactory().createParser(cachedResult),
cacheObjectClazz cacheObjectClazz
); );
} } catch (IOException e) {
catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
@ -340,7 +339,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
CachePopulator cachePopulator = cachePopulatorMap.get( CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", segmentIdentifier, value.getInterval()) String.format("%s_%s", segmentIdentifier, value.getInterval())
); );
if(cachePopulator != null) { if (cachePopulator != null) {
cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache)); cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache));
} }
@ -425,8 +424,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
cache.put(key, valueBytes); cache.put(key, valueBytes);
} } catch (IOException e) {
catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }

View File

@ -30,11 +30,19 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.query.*; import io.druid.query.DataSource;
import io.druid.segment.ReferenceCountingSegment; import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink; import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
@ -90,6 +98,7 @@ public class RealtimeManager implements QuerySegmentWalker
Closeables.closeQuietly(chief); Closeables.closeQuietly(chief);
} }
} }
public FireDepartmentMetrics getMetrics(String datasource) public FireDepartmentMetrics getMetrics(String datasource)
{ {
FireChief chief = chiefs.get(datasource); FireChief chief = chiefs.get(datasource);
@ -124,9 +133,8 @@ public class RealtimeManager implements QuerySegmentWalker
String dataSourceName; String dataSourceName;
try { try {
dataSourceName = ((TableDataSource)query.getDataSource()).getName(); dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} } catch (ClassCastException e) {
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker"); throw new UnsupportedOperationException("Subqueries are only supported in the broker");
} }
return dataSourceName; return dataSourceName;
@ -164,8 +172,7 @@ public class RealtimeManager implements QuerySegmentWalker
log.info("Someone get us a plumber!"); log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber(); plumber = fireDepartment.findPlumber();
log.info("We have our plumber!"); log.info("We have our plumber!");
} } catch (IOException e) {
catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
@ -192,8 +199,7 @@ public class RealtimeManager implements QuerySegmentWalker
try { try {
try { try {
inputRow = firehose.nextRow(); inputRow = firehose.nextRow();
} } catch (Exception e) {
catch (Exception e) {
log.debug(e, "thrown away line due to exception, considering unparseable"); log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable(); metrics.incrementUnparseable();
continue; continue;
@ -218,8 +224,7 @@ public class RealtimeManager implements QuerySegmentWalker
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
} }
metrics.incrementProcessed(); metrics.incrementProcessed();
} } catch (FormattedException e) {
catch (FormattedException e) {
log.info(e, "unparseable line: %s", e.getDetails()); log.info(e, "unparseable line: %s", e.getDetails());
metrics.incrementUnparseable(); metrics.incrementUnparseable();
continue; continue;
@ -227,16 +232,15 @@ public class RealtimeManager implements QuerySegmentWalker
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource()) log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())
.emit(); .emit();
normalExit = false; normalExit = false;
throw e; throw e;
} catch (Error e) { } catch (Error e) {
log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource()) log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())
.emit(); .emit();
normalExit = false; normalExit = false;
throw e; throw e;
} } finally {
finally {
Closeables.closeQuietly(firehose); Closeables.closeQuietly(firehose);
if (normalExit) { if (normalExit) {
plumber.finishJob(); plumber.finishJob();

View File

@ -30,7 +30,20 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.CountingMap; import io.druid.collections.CountingMap;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.*; import io.druid.query.BySegmentQueryRunner;
import io.druid.query.DataSource;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.ReferenceCountingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec; import io.druid.query.spec.SpecificSegmentSpec;
@ -227,9 +240,8 @@ public class ServerManager implements QuerySegmentWalker
String dataSourceName; String dataSourceName;
try { try {
dataSourceName = ((TableDataSource)query.getDataSource()).getName(); dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} } catch (ClassCastException e) {
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker"); throw new UnsupportedOperationException("Subqueries are only supported in the broker");
} }
@ -313,9 +325,8 @@ public class ServerManager implements QuerySegmentWalker
String dataSourceName; String dataSourceName;
try { try {
dataSourceName = ((TableDataSource)query.getDataSource()).getName(); dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} } catch (ClassCastException e) {
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker"); throw new UnsupportedOperationException("Subqueries are only supported in the broker");
} }

View File

@ -23,7 +23,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.*; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.MergeIterable;
@ -38,7 +43,17 @@ import io.druid.client.selector.ServerSelector;
import io.druid.granularity.PeriodGranularity; import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.*; import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -58,11 +73,19 @@ import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.query.topn.*; import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.TestHelper; import io.druid.segment.TestHelper;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.*; import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import io.druid.timeline.partition.StringPartitionChunk;
import org.easymock.Capture; import org.easymock.Capture;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -77,7 +100,13 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
@ -184,13 +213,13 @@ public class CachingClusteredClientTest
public void testTimeseriesCaching() throws Exception public void testTimeseriesCaching() throws Exception
{ {
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE) .dataSource(DATA_SOURCE)
.intervals(SEG_SPEC) .intervals(SEG_SPEC)
.filters(DIM_FILTER) .filters(DIM_FILTER)
.granularity(GRANULARITY) .granularity(GRANULARITY)
.aggregators(AGGS) .aggregators(AGGS)
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
testQueryCaching( testQueryCaching(
builder.build(), builder.build(),
@ -235,9 +264,9 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -247,13 +276,13 @@ public class CachingClusteredClientTest
public void testTimeseriesCachingTimeZone() throws Exception public void testTimeseriesCachingTimeZone() throws Exception
{ {
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE) .dataSource(DATA_SOURCE)
.intervals(SEG_SPEC) .intervals(SEG_SPEC)
.filters(DIM_FILTER) .filters(DIM_FILTER)
.granularity(PT1H_TZ_GRANULARITY) .granularity(PT1H_TZ_GRANULARITY)
.aggregators(AGGS) .aggregators(AGGS)
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
testQueryCaching( testQueryCaching(
builder.build(), builder.build(),
@ -275,9 +304,9 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-11-04/2011-11-08") builder.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -286,18 +315,18 @@ public class CachingClusteredClientTest
public void testDisableUseCache() throws Exception public void testDisableUseCache() throws Exception
{ {
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE) .dataSource(DATA_SOURCE)
.intervals(SEG_SPEC) .intervals(SEG_SPEC)
.filters(DIM_FILTER) .filters(DIM_FILTER)
.granularity(GRANULARITY) .granularity(GRANULARITY)
.aggregators(AGGS) .aggregators(AGGS)
.postAggregators(POST_AGGS); .postAggregators(POST_AGGS);
testQueryCaching( testQueryCaching(
1, 1,
true, true,
builder.context(ImmutableMap.of("useCache", "false", builder.context(ImmutableMap.of("useCache", "false",
"populateCache", "true")).build(), "populateCache", "true")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
); );
@ -311,7 +340,7 @@ public class CachingClusteredClientTest
1, 1,
false, false,
builder.context(ImmutableMap.of("useCache", "false", builder.context(ImmutableMap.of("useCache", "false",
"populateCache", "false")).build(), "populateCache", "false")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
); );
@ -323,7 +352,7 @@ public class CachingClusteredClientTest
1, 1,
false, false,
builder.context(ImmutableMap.of("useCache", "true", builder.context(ImmutableMap.of("useCache", "true",
"populateCache", "false")).build(), "populateCache", "false")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
); );
@ -392,10 +421,10 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -437,10 +466,10 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-11-04/2011-11-08") builder.intervals("2011-11-04/2011-11-08")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -503,10 +532,10 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -552,7 +581,8 @@ public class CachingClusteredClientTest
); );
} }
public void testQueryCaching(final Query query, Object... args) { public void testQueryCaching(final Query query, Object... args)
{
testQueryCaching(3, true, query, args); testQueryCaching(3, true, query, args);
} }
@ -607,8 +637,8 @@ public class CachingClusteredClientTest
EasyMock.expect(serverView.getQueryRunner(server)) EasyMock.expect(serverView.getQueryRunner(server))
.andReturn(expectations.getQueryRunner()) .andReturn(expectations.getQueryRunner())
.once(); .once();
final Capture<? extends Query> capture = new Capture(); final Capture<? extends Query> capture = new Capture();
queryCaptures.add(capture); queryCaptures.add(capture);
@ -625,8 +655,8 @@ public class CachingClusteredClientTest
} }
EasyMock.expect(queryable.run(EasyMock.capture(capture))) EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once(); .once();
} else if (query instanceof TopNQuery) { } else if (query instanceof TopNQuery) {
List<String> segmentIds = Lists.newArrayList(); List<String> segmentIds = Lists.newArrayList();
@ -638,8 +668,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults()); results.add(expectation.getResults());
} }
EasyMock.expect(queryable.run(EasyMock.capture(capture))) EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTopNResults(segmentIds, intervals, results)) .andReturn(toQueryableTopNResults(segmentIds, intervals, results))
.once(); .once();
} else if (query instanceof SearchQuery) { } else if (query instanceof SearchQuery) {
List<String> segmentIds = Lists.newArrayList(); List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList(); List<Interval> intervals = Lists.newArrayList();
@ -650,8 +680,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults()); results.add(expectation.getResults());
} }
EasyMock.expect(queryable.run(EasyMock.capture(capture))) EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableSearchResults(segmentIds, intervals, results)) .andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once(); .once();
} else if (query instanceof TimeBoundaryQuery) { } else if (query instanceof TimeBoundaryQuery) {
List<String> segmentIds = Lists.newArrayList(); List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList(); List<Interval> intervals = Lists.newArrayList();
@ -662,8 +692,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults()); results.add(expectation.getResults());
} }
EasyMock.expect(queryable.run(EasyMock.capture(capture))) EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
.once(); .once();
} else { } else {
throw new ISE("Unknown query type[%s]", query.getClass()); throw new ISE("Unknown query type[%s]", query.getClass());
} }
@ -730,13 +760,12 @@ public class CachingClusteredClientTest
// make sure all the queries were sent down as 'bySegment' // make sure all the queries were sent down as 'bySegment'
for (Capture queryCapture : queryCaptures) { for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue(); Query capturedQuery = (Query) queryCapture.getValue();
if(expectBySegment) { if (expectBySegment) {
Assert.assertEquals("true", capturedQuery.getContextValue("bySegment")); Assert.assertEquals("true", capturedQuery.getContextValue("bySegment"));
} } else {
else {
Assert.assertTrue( Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null || capturedQuery.getContextValue("bySegment") == null ||
capturedQuery.getContextValue("bySegment").equals("false") capturedQuery.getContextValue("bySegment").equals("false")
); );
} }
} }
@ -791,7 +820,8 @@ public class CachingClusteredClientTest
} }
timeline.add(queryIntervals.get(k), String.valueOf(k), chunk); timeline.add(queryIntervals.get(k), String.valueOf(k), chunk);
} }
} return serverExpectationList; }
return serverExpectationList;
} }
private Sequence<Result<TimeseriesResultValue>> toQueryableTimeseriesResults( private Sequence<Result<TimeseriesResultValue>> toQueryableTimeseriesResults(
@ -801,35 +831,35 @@ public class CachingClusteredClientTest
Iterable<Iterable<Result<TimeseriesResultValue>>> results Iterable<Iterable<Result<TimeseriesResultValue>>> results
) )
{ {
if(bySegment) { if (bySegment) {
return Sequences.simple( return Sequences.simple(
FunctionalIterable FunctionalIterable
.create(segmentIds) .create(segmentIds)
.trinaryTransform( .trinaryTransform(
intervals, intervals,
results, results,
new TrinaryFn<String, Interval, Iterable<Result<TimeseriesResultValue>>, Result<TimeseriesResultValue>>() new TrinaryFn<String, Interval, Iterable<Result<TimeseriesResultValue>>, Result<TimeseriesResultValue>>()
{
@Override
@SuppressWarnings("unchecked")
public Result<TimeseriesResultValue> apply(
final String segmentId,
final Interval interval,
final Iterable<Result<TimeseriesResultValue>> results
)
{ {
return new Result( @Override
results.iterator().next().getTimestamp(), @SuppressWarnings("unchecked")
new BySegmentResultValueClass( public Result<TimeseriesResultValue> apply(
Lists.newArrayList(results), final String segmentId,
segmentId, final Interval interval,
interval final Iterable<Result<TimeseriesResultValue>> results
) )
); {
return new Result(
results.iterator().next().getTimestamp(),
new BySegmentResultValueClass(
Lists.newArrayList(results),
segmentId,
interval
)
);
}
} }
} )
) );
);
} else { } else {
return Sequences.simple(Iterables.concat(results)); return Sequences.simple(Iterables.concat(results));
} }
@ -967,36 +997,36 @@ public class CachingClusteredClientTest
} }
private Iterable<BySegmentResultValueClass<TimeseriesResultValue>> makeBySegmentTimeResults private Iterable<BySegmentResultValueClass<TimeseriesResultValue>> makeBySegmentTimeResults
(Object... objects) (Object... objects)
{ {
if (objects.length % 5 != 0) { if (objects.length % 5 != 0) {
throw new ISE("makeTimeResults must be passed arguments in groups of 5, got[%d]", objects.length); throw new ISE("makeTimeResults must be passed arguments in groups of 5, got[%d]", objects.length);
}
List<BySegmentResultValueClass<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 5);
for (int i = 0; i < objects.length; i += 5) {
retVal.add(
new BySegmentResultValueClass<TimeseriesResultValue>(
Lists.newArrayList(
new TimeseriesResultValue(
ImmutableMap.of(
"rows", objects[i + 1],
"imps", objects[i + 2],
"impers", objects[i + 2],
"avg_imps_per_row",
((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue()
)
)
),
(String)objects[i+3],
(Interval)objects[i+4]
)
);
}
return retVal;
} }
List<BySegmentResultValueClass<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 5);
for (int i = 0; i < objects.length; i += 5) {
retVal.add(
new BySegmentResultValueClass<TimeseriesResultValue>(
Lists.newArrayList(
new TimeseriesResultValue(
ImmutableMap.of(
"rows", objects[i + 1],
"imps", objects[i + 2],
"impers", objects[i + 2],
"avg_imps_per_row",
((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue()
)
)
),
(String) objects[i + 3],
(Interval) objects[i + 4]
)
);
}
return retVal;
}
private Iterable<Result<TimeseriesResultValue>> makeRenamedTimeResults private Iterable<Result<TimeseriesResultValue>> makeRenamedTimeResults
(Object... objects) (Object... objects)
{ {
@ -1129,13 +1159,13 @@ public class CachingClusteredClientTest
return new CachingClusteredClient( return new CachingClusteredClient(
new MapQueryToolChestWarehouse( new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder() ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put( .put(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig()) new TimeseriesQueryQueryToolChest(new QueryConfig())
) )
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.build() .build()
), ),
new TimelineServerView() new TimelineServerView()
{ {