Merge branch 'master' of github.com:metamx/druid into cleanup-ingest

This commit is contained in:
fjy 2014-07-17 14:38:54 -07:00
commit 76f096e667
47 changed files with 1412 additions and 250 deletions

View File

@ -220,7 +220,7 @@ Kill tasks delete all information about a segment and removes it from deep stora
"type": "kill",
"id": <task_id>,
"dataSource": <task_datasource>,
"segments": <JSON list of DataSegment objects to append>
"interval" : <all_segments_in_this_interval_will_die!>
}
```

View File

@ -41,7 +41,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.5</metamx.java-util.version>
<apache.curator.version>2.5.0</apache.curator.version>
<druid.api.version>0.2.5</druid.api.version>
<druid.api.version>0.2.6</druid.api.version>
</properties>
<modules>

View File

@ -35,6 +35,8 @@ import io.druid.query.search.SearchResultValue;
import io.druid.query.search.search.InsensitiveContainsSearchQuerySpec;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.search.search.SearchQuerySpec;
import io.druid.query.select.PagingSpec;
import io.druid.query.select.SelectQuery;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
@ -943,4 +945,158 @@ public class Druids
{
return new SegmentMetadataQueryBuilder();
}
/**
* A Builder for SelectQuery.
* <p/>
* Required: dataSource(), intervals() must be called before build()
* <p/>
* Usage example:
* <pre><code>
* SelectQuery query = new SelectQueryBuilder()
* .dataSource("Example")
* .interval("2010/2013")
* .build();
* </code></pre>
*
* @see io.druid.query.select.SelectQuery
*/
public static class SelectQueryBuilder
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private Map<String, Object> context;
private DimFilter dimFilter;
private QueryGranularity granularity;
private List<String> dimensions;
private List<String> metrics;
private PagingSpec pagingSpec;
public SelectQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
context = null;
dimFilter = null;
granularity = QueryGranularity.ALL;
dimensions = Lists.newArrayList();
metrics = Lists.newArrayList();
pagingSpec = null;
}
public SelectQuery build()
{
return new SelectQuery(
dataSource,
querySegmentSpec,
dimFilter,
granularity,
dimensions,
metrics,
pagingSpec,
context
);
}
public SelectQueryBuilder copy(SelectQueryBuilder builder)
{
return new SelectQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.context(builder.context);
}
public SelectQueryBuilder dataSource(String ds)
{
dataSource = new TableDataSource(ds);
return this;
}
public SelectQueryBuilder dataSource(DataSource ds)
{
dataSource = ds;
return this;
}
public SelectQueryBuilder intervals(QuerySegmentSpec q)
{
querySegmentSpec = q;
return this;
}
public SelectQueryBuilder intervals(String s)
{
querySegmentSpec = new LegacySegmentSpec(s);
return this;
}
public SelectQueryBuilder intervals(List<Interval> l)
{
querySegmentSpec = new LegacySegmentSpec(l);
return this;
}
public SelectQueryBuilder context(Map<String, Object> c)
{
context = c;
return this;
}
public SelectQueryBuilder filters(String dimensionName, String value)
{
dimFilter = new SelectorDimFilter(dimensionName, value);
return this;
}
public SelectQueryBuilder filters(String dimensionName, String value, String... values)
{
List<DimFilter> fields = Lists.<DimFilter>newArrayList(new SelectorDimFilter(dimensionName, value));
for (String val : values) {
fields.add(new SelectorDimFilter(dimensionName, val));
}
dimFilter = new OrDimFilter(fields);
return this;
}
public SelectQueryBuilder filters(DimFilter f)
{
dimFilter = f;
return this;
}
public SelectQueryBuilder granularity(String g)
{
granularity = QueryGranularity.fromString(g);
return this;
}
public SelectQueryBuilder granularity(QueryGranularity g)
{
granularity = g;
return this;
}
public SelectQueryBuilder dimensions(List<String> d)
{
dimensions = d;
return this;
}
public SelectQueryBuilder metrics(List<String> m)
{
metrics = m;
return this;
}
public SelectQueryBuilder pagingSpec(PagingSpec p)
{
pagingSpec = p;
return this;
}
}
public static SelectQueryBuilder newSelectQueryBuilder()
{
return new SelectQueryBuilder();
}
}

View File

@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -41,7 +40,6 @@ import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -50,72 +48,66 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class GroupByParallelQueryRunner implements QueryRunner<Row>
public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
private final Iterable<QueryRunner<Row>> queryables;
private final Iterable<QueryRunner<T>> queryables;
private final ListeningExecutorService exec;
private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier;
private final QueryWatcher queryWatcher;
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, queryWatcher, Arrays.asList(queryables));
}
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
Iterable<QueryRunner<Row>> queryables
Iterable<QueryRunner<T>> queryables
)
{
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
}
@Override
public Sequence<Row> run(final Query<Row> queryParam)
public Sequence<T> run(final Query<T> queryParam)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
configSupplier.get()
);
final Pair<List, Accumulator<List, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = query.getContextBySegment(false);
final int priority = query.getContextPriority(0);
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>()
new Function<QueryRunner<T>, ListenableFuture<Void>>()
{
@Override
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input)
public ListenableFuture<Void> apply(final QueryRunner<T> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
new AbstractPrioritizedCallable<Void>(priority)
{
@Override
public Boolean call() throws Exception
public Void call() throws Exception
{
try {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
if (bySegment) {
input.run(queryParam)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
return null;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
@ -137,7 +129,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
try {
queryWatcher.registerQuery(query, futures);
final Number timeout = query.getContextValue("timeout", (Number) null);
if(timeout == null) {
if (timeout == null) {
futures.get();
} else {
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS);
@ -148,10 +140,10 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
@ -160,7 +152,22 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
throw Throwables.propagate(e.getCause());
}
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));
}
if (bySegment) {
return Sequences.simple(bySegmentAccumulatorPair.lhs);
}
return Sequences.simple(
Iterables.transform(
indexAccumulatorPair.lhs.iterableWithPostAggregations(null),
new Function<Row, T>()
{
@Override
public T apply(Row input)
{
return (T) input;
}
}
)
);
}
}

View File

@ -41,13 +41,17 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* This method doesn't belong here, but it's here for now just to make it work.
*
* @param seqOfSequences
*
* @return
*/
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn);
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(
QueryType query,
MetricManipulationFn fn
);
public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn)
{
@ -56,19 +60,23 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
public abstract TypeReference<ResultType> getResultTypeReference();
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query) {
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
{
return null;
}
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner) {
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner)
{
return runner;
}
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner) {
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner)
{
return runner;
}
public <T extends LogicalSegment> List<T> filterSegments(QueryType query, List<T> segments) {
public <T extends LogicalSegment> List<T> filterSegments(QueryType query, List<T> segments)
{
return segments;
}
}

View File

@ -51,7 +51,6 @@ import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@ -111,14 +110,14 @@ public class GroupByQuery extends BaseQuery<Row>
new Function<Sequence<Row>, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(@Nullable Sequence<Row> input)
public Sequence<Row> apply(Sequence<Row> input)
{
return Sequences.filter(
input,
new Predicate<Row>()
{
@Override
public boolean apply(@Nullable Row input)
public boolean apply(Row input)
{
return GroupByQuery.this.havingSpec.eval(input);
}

View File

@ -81,7 +81,7 @@ public class GroupByQueryEngine
this.intermediateResultsBufferPool = intermediateResultsBufferPool;
}
public Sequence<Row> process(final GroupByQuery query, StorageAdapter storageAdapter)
public Sequence<Row> process(final GroupByQuery query, final StorageAdapter storageAdapter)
{
if (storageAdapter == null) {
throw new ISE(
@ -104,41 +104,41 @@ public class GroupByQueryEngine
return Sequences.concat(
Sequences.withBaggage(
Sequences.map(
cursors,
new Function<Cursor, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(@Nullable final Cursor cursor)
{
return new BaseSequence<Row, RowIterator>(
new BaseSequence.IteratorMaker<Row, RowIterator>()
Sequences.map(
cursors,
new Function<Cursor, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(final Cursor cursor)
{
@Override
public RowIterator make()
{
return new RowIterator(query, cursor, bufferHolder.get(), config.get());
}
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, RowIterator>()
{
@Override
public RowIterator make()
{
return new RowIterator(query, cursor, bufferHolder.get(), config.get());
}
@Override
public void cleanup(RowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
@Override
public void cleanup(RowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
}
);
}
);
}
),
new Closeable()
{
@Override
public void close() throws IOException
{
CloseQuietly.close(bufferHolder);
}
}
}
),
new Closeable()
{
@Override
public void close() throws IOException
{
CloseQuietly.close(bufferHolder);
}
}
)
)
);
}

View File

@ -24,21 +24,18 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import javax.annotation.Nullable;
import java.util.List;
public class GroupByQueryHelper
{
public static Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> createIndexAccumulatorPair(
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query,
final GroupByQueryConfig config
)
@ -80,13 +77,18 @@ public class GroupByQueryHelper
aggs.toArray(new AggregatorFactory[aggs.size()])
);
Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
public IncrementalIndex accumulate(IncrementalIndex accumulated, T in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions), false) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
if (in instanceof Row) {
if (accumulated.add(Rows.toCaseInsensitiveInputRow((Row) in, dimensions), false)
> config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}
} else {
throw new ISE("Unable to accumulate something of type [%s]", in.getClass());
}
return accumulated;
@ -95,4 +97,18 @@ public class GroupByQueryHelper
return new Pair<>(index, accumulator);
}
public static <T> Pair<List, Accumulator<List, T>> createBySegmentAccumulatorPair()
{
List init = Lists.newArrayList();
Accumulator<List, T> accumulator = new Accumulator<List, T>()
{
@Override
public List accumulate(List accumulated, T in)
{
accumulated.add(in);
return accumulated;
}
};
return new Pair<>(init, accumulator);
}
}

View File

@ -20,25 +20,32 @@
package io.druid.query.groupby;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
@ -46,11 +53,16 @@ import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -58,6 +70,11 @@ import java.util.Map;
*/
public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery>
{
private static final byte GROUPBY_QUERY = 0x14;
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
};
private static final TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>()
{
};
@ -66,16 +83,20 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
GROUP_BY_MERGE_KEY,
"false"
);
private final Supplier<GroupByQueryConfig> configSupplier;
private final ObjectMapper jsonMapper;
private GroupByQueryEngine engine; // For running the outer query around a subquery
@Inject
public GroupByQueryQueryToolChest(
Supplier<GroupByQueryConfig> configSupplier,
ObjectMapper jsonMapper,
GroupByQueryEngine engine
)
{
this.configSupplier = configSupplier;
this.jsonMapper = jsonMapper;
this.engine = engine;
}
@ -87,11 +108,14 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> run(Query<Row> input)
{
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
} else {
if (input.getContextBySegment(false)) {
return runner.run(input);
}
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
}
return runner.run(input);
}
};
}
@ -169,7 +193,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new ConcatSequence<>(seqOfSequences);
return new OrderedMergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
}
@Override
@ -191,7 +215,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
@Override
public Function<Row, Row> makePreComputeManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn)
public Function<Row, Row> makePreComputeManipulatorFn(
final GroupByQuery query,
final MetricManipulationFn fn
)
{
return new Function<Row, Row>()
{
@ -200,7 +227,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
if (input instanceof MapBasedRow) {
final MapBasedRow inputRow = (MapBasedRow) input;
final Map<String, Object> values = Maps.newHashMap(((MapBasedRow) input).getEvent());
final Map<String, Object> values = Maps.newHashMap(inputRow.getEvent());
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, inputRow.getEvent().get(agg.getName())));
}
@ -220,8 +247,119 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
{
return new SubqueryQueryRunner<Row>(
new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod())
return new SubqueryQueryRunner<>(
new IntervalChunkingQueryRunner<>(runner, configSupplier.get().getChunkPeriod())
);
}
@Override
public CacheStrategy<Row, Object, GroupByQuery> getCacheStrategy(final GroupByQuery query)
{
return new CacheStrategy<Row, Object, GroupByQuery>()
{
@Override
public byte[] computeCacheKey(GroupByQuery query)
{
final DimFilter dimFilter = query.getDimFilter();
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
final byte[] aggregatorBytes = QueryCacheHelper.computeAggregatorBytes(query.getAggregatorSpecs());
final byte[] granularityBytes = query.getGranularity().cacheKey();
final byte[][] dimensionsBytes = new byte[query.getDimensions().size()][];
int dimensionsBytesSize = 0;
int index = 0;
for (DimensionSpec dimension : query.getDimensions()) {
dimensionsBytes[index] = dimension.getCacheKey();
dimensionsBytesSize += dimensionsBytes[index].length;
++index;
}
final byte[] havingBytes = query.getHavingSpec() == null ? new byte[]{} : query.getHavingSpec().getCacheKey();
final byte[] limitBytes = query.getLimitSpec().getCacheKey();
ByteBuffer buffer = ByteBuffer
.allocate(
1
+ granularityBytes.length
+ filterBytes.length
+ aggregatorBytes.length
+ dimensionsBytesSize
+ havingBytes.length
+ limitBytes.length
)
.put(GROUPBY_QUERY)
.put(granularityBytes)
.put(filterBytes)
.put(aggregatorBytes);
for (byte[] dimensionsByte : dimensionsBytes) {
buffer.put(dimensionsByte);
}
return buffer
.put(havingBytes)
.put(limitBytes)
.array();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<Row, Object> prepareForCache()
{
return new Function<Row, Object>()
{
@Override
public Object apply(Row input)
{
if (input instanceof MapBasedRow) {
final MapBasedRow row = (MapBasedRow) input;
final List<Object> retVal = Lists.newArrayListWithCapacity(2);
retVal.add(row.getTimestamp().getMillis());
retVal.add(row.getEvent());
return retVal;
}
throw new ISE("Don't know how to cache input rows of type[%s]", input.getClass());
}
};
}
@Override
public Function<Object, Row> pullFromCache()
{
return new Function<Object, Row>()
{
private final QueryGranularity granularity = query.getGranularity();
@Override
public Row apply(Object input)
{
Iterator<Object> results = ((List<Object>) input).iterator();
DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());
return new MapBasedRow(
timestamp,
(Map<String, Object>) jsonMapper.convertValue(
results.next(),
new TypeReference<Map<String, Object>>()
{
}
)
);
}
};
}
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new MergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
}
};
}
}

View File

@ -22,18 +22,20 @@ package io.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Row;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.GroupByParallelQueryRunner;
import io.druid.query.Query;
@ -44,7 +46,9 @@ import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.incremental.IncrementalIndex;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -88,8 +92,9 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
{
// mergeRunners should take ListeningExecutorService at some point
final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
if (config.get().isSingleThreaded()) {
return new ConcatQueryRunner<Row>(
return new ConcatQueryRunner<>(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<Row>, QueryRunner<Row>>()
@ -102,34 +107,54 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
@Override
public Sequence<Row> run(final Query<Row> query)
{
final GroupByQuery queryParam = (GroupByQuery) query;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper
.createIndexAccumulatorPair(
queryParam,
config.get()
);
final Pair<List, Accumulator<List, Row>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final int priority = query.getContextPriority(0);
final boolean bySegment = query.getContextBySegment(false);
ListenableFuture<Sequence<Row>> future = queryExecutor.submit(
new Callable<Sequence<Row>>()
final ListenableFuture<Void> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Void>(priority)
{
@Override
public Sequence<Row> call() throws Exception
public Void call() throws Exception
{
return new ExecutorExecutingSequence<Row>(
input.run(query),
queryExecutor
);
if (bySegment) {
input.run(queryParam)
.accumulate(
bySegmentAccumulatorPair.lhs,
bySegmentAccumulatorPair.rhs
);
} else {
input.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
return null;
}
}
);
try {
queryWatcher.registerQuery(query, future);
final Number timeout = query.getContextValue("timeout", (Number)null);
return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
final Number timeout = query.getContextValue("timeout", (Number) null);
if (timeout == null) {
future.get();
} else {
future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
@ -137,6 +162,12 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
}
if (bySegment) {
return Sequences.simple(bySegmentAccumulatorPair.lhs);
}
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));
}
};
}
@ -144,7 +175,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
)
);
} else {
return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryWatcher, queryRunners);
return new GroupByParallelQueryRunner(queryExecutor, config, queryWatcher, queryRunners);
}
}
@ -175,13 +206,4 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return engine.process((GroupByQuery) input, adapter);
}
}
private static class RowOrdering extends Ordering<Row>
{
@Override
public int compare(Row left, Row right)
{
return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch());
}
}
}

View File

@ -22,12 +22,21 @@ package io.druid.query.groupby.having;
import io.druid.data.input.Row;
/**
* A "having" spec that always evaluates to true
*/
public class AlwaysHavingSpec implements HavingSpec
{
private static final byte CACHE_KEY = 0x0;
@Override
public boolean eval(Row row)
{
return true;
}
@Override
public byte[] getCacheKey()
{
return new byte[]{CACHE_KEY};
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.Row;
import java.nio.ByteBuffer;
import java.util.List;
/**
@ -31,6 +32,8 @@ import java.util.List;
*/
public class AndHavingSpec implements HavingSpec
{
private static final byte CACHE_KEY = 0x2;
private List<HavingSpec> havingSpecs;
@JsonCreator
@ -57,6 +60,25 @@ public class AndHavingSpec implements HavingSpec
return true;
}
@Override
public byte[] getCacheKey()
{
final byte[][] havingBytes = new byte[havingSpecs.size()][];
int havingBytesSize = 0;
int index = 0;
for (HavingSpec havingSpec : havingSpecs) {
havingBytes[index] = havingSpec.getCacheKey();
havingBytesSize += havingBytes[index].length;
++index;
}
ByteBuffer buffer = ByteBuffer.allocate(1 + havingBytesSize).put(CACHE_KEY);
for (byte[] havingByte : havingBytes) {
buffer.put(havingByte);
}
return buffer.array();
}
@Override
public boolean equals(Object o)
{

View File

@ -21,14 +21,20 @@ package io.druid.query.groupby.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Bytes;
import io.druid.data.input.Row;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value",
* except that in SQL an aggregation is an expression instead of an aggregation name as in Druid.
*/
public class EqualToHavingSpec implements HavingSpec
{
private static final byte CACHE_KEY = 0x3;
private String aggregationName;
private Number value;
@ -62,6 +68,18 @@ public class EqualToHavingSpec implements HavingSpec
return Float.compare(value.floatValue(), metricValue) == 0;
}
@Override
public byte[] getCacheKey()
{
final byte[] aggBytes = aggregationName.getBytes();
final byte[] valBytes = Bytes.toArray(Arrays.asList(value));
return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length)
.put(CACHE_KEY)
.put(aggBytes)
.put(valBytes)
.array();
}
/**
* This method treats internal value as double mainly for ease of test.
*/

View File

@ -21,7 +21,12 @@ package io.druid.query.groupby.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Bytes;
import io.druid.data.input.Row;
import io.druid.query.Result;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value",
@ -29,6 +34,8 @@ import io.druid.data.input.Row;
*/
public class GreaterThanHavingSpec implements HavingSpec
{
private static final byte CACHE_KEY = 0x4;
private String aggregationName;
private Number value;
@ -62,6 +69,18 @@ public class GreaterThanHavingSpec implements HavingSpec
return Float.compare(metricValue, value.floatValue()) > 0;
}
@Override
public byte[] getCacheKey()
{
final byte[] aggBytes = aggregationName.getBytes();
final byte[] valBytes = Bytes.toArray(Arrays.asList(value));
return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length)
.put(CACHE_KEY)
.put(aggBytes)
.put(valBytes)
.array();
}
/**
* This method treats internal value as double mainly for ease of test.
*/

View File

@ -38,6 +38,11 @@ import io.druid.data.input.Row;
})
public interface HavingSpec
{
// Atoms for easy combination, but for now they are mostly useful
// for testing.
public static final HavingSpec NEVER = new NeverHavingSpec();
public static final HavingSpec ALWAYS = new AlwaysHavingSpec();
/**
* Evaluates if a given row satisfies the having spec.
*
@ -49,22 +54,5 @@ public interface HavingSpec
*/
public boolean eval(Row row);
// Atoms for easy combination, but for now they are mostly useful
// for testing.
/**
* A "having" spec that always evaluates to false
*/
public static final HavingSpec NEVER = new HavingSpec()
{
@Override
public boolean eval(Row row)
{
return false;
}
};
/**
* A "having" spec that always evaluates to true
*/
public static final HavingSpec ALWAYS = new AlwaysHavingSpec();
public byte[] getCacheKey();
}

View File

@ -20,7 +20,12 @@
package io.druid.query.groupby.having;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Bytes;
import io.druid.data.input.Row;
import io.druid.query.Result;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value",
@ -28,14 +33,15 @@ import io.druid.data.input.Row;
*/
public class LessThanHavingSpec implements HavingSpec
{
private static final byte CACHE_KEY = 0x5;
private String aggregationName;
private Number value;
public LessThanHavingSpec
(
@JsonProperty("aggregation") String aggName,
@JsonProperty("value") Number value
)
public LessThanHavingSpec(
@JsonProperty("aggregation") String aggName,
@JsonProperty("value") Number value
)
{
this.aggregationName = aggName;
this.value = value;
@ -61,6 +67,18 @@ public class LessThanHavingSpec implements HavingSpec
return Float.compare(metricValue, value.floatValue()) < 0;
}
@Override
public byte[] getCacheKey()
{
final byte[] aggBytes = aggregationName.getBytes();
final byte[] valBytes = Bytes.toArray(Arrays.asList(value));
return ByteBuffer.allocate(1 + aggBytes.length + valBytes.length)
.put(CACHE_KEY)
.put(aggBytes)
.put(valBytes)
.array();
}
/**
* This method treats internal value as double mainly for ease of test.
*/

View File

@ -0,0 +1,42 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.groupby.having;
import io.druid.data.input.Row;
/**
* A "having" spec that always evaluates to false
*/
public class NeverHavingSpec implements HavingSpec
{
private static final byte CACHE_KEY = 0x1;
@Override
public boolean eval(Row row)
{
return false;
}
@Override
public byte[] getCacheKey()
{
return new byte[]{CACHE_KEY};
}
}

View File

@ -22,12 +22,17 @@ package io.druid.query.groupby.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.Row;
import io.druid.query.Result;
import java.nio.ByteBuffer;
/**
* The logical "not" operator for the "having" clause.
*/
public class NotHavingSpec implements HavingSpec
{
private static final byte CACHE_KEY = 0x6;
private HavingSpec havingSpec;
@JsonCreator
@ -48,6 +53,15 @@ public class NotHavingSpec implements HavingSpec
return !havingSpec.eval(row);
}
@Override
public byte[] getCacheKey()
{
return ByteBuffer.allocate(1 + havingSpec.getCacheKey().length)
.put(CACHE_KEY)
.put(havingSpec.getCacheKey())
.array();
}
@Override
public String toString()
{

View File

@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.Row;
import io.druid.query.Result;
import java.nio.ByteBuffer;
import java.util.List;
/**
@ -31,23 +33,27 @@ import java.util.List;
*/
public class OrHavingSpec implements HavingSpec
{
private static final byte CACHE_KEY = 0x7;
private List<HavingSpec> havingSpecs;
@JsonCreator
public OrHavingSpec(@JsonProperty("havingSpecs") List<HavingSpec> havingSpecs) {
public OrHavingSpec(@JsonProperty("havingSpecs") List<HavingSpec> havingSpecs)
{
this.havingSpecs = havingSpecs == null ? ImmutableList.<HavingSpec>of() : havingSpecs;
}
@JsonProperty("havingSpecs")
public List<HavingSpec> getHavingSpecs(){
public List<HavingSpec> getHavingSpecs()
{
return havingSpecs;
}
@Override
public boolean eval(Row row)
{
for(HavingSpec havingSpec: havingSpecs) {
if(havingSpec.eval(row)){
for (HavingSpec havingSpec : havingSpecs) {
if (havingSpec.eval(row)) {
return true;
}
}
@ -55,15 +61,40 @@ public class OrHavingSpec implements HavingSpec
return false;
}
@Override
public byte[] getCacheKey()
{
final byte[][] havingBytes = new byte[havingSpecs.size()][];
int havingBytesSize = 0;
int index = 0;
for (HavingSpec havingSpec : havingSpecs) {
havingBytes[index] = havingSpec.getCacheKey();
havingBytesSize += havingBytes[index].length;
++index;
}
ByteBuffer buffer = ByteBuffer.allocate(1 + havingBytesSize).put(CACHE_KEY);
for (byte[] havingByte : havingBytes) {
buffer.put(havingByte);
}
return buffer.array();
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OrHavingSpec that = (OrHavingSpec) o;
if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) return false;
if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) {
return false;
}
return true;
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
@ -37,6 +38,7 @@ import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@ -46,6 +48,8 @@ import java.util.Map;
*/
public class DefaultLimitSpec implements LimitSpec
{
private static final byte CACHE_KEY = 0x1;
private final List<OrderByColumnSpec> columns;
private final int limit;
@ -196,7 +200,7 @@ public class DefaultLimitSpec implements LimitSpec
@Override
public Sequence<Row> apply(
@Nullable Sequence<Row> input
Sequence<Row> input
)
{
return Sequences.limit(input, limit);
@ -275,12 +279,12 @@ public class DefaultLimitSpec implements LimitSpec
{
this.limit = limit;
this.sorter = new TopNSorter<Row>(ordering);
this.sorter = new TopNSorter<>(ordering);
}
@Override
public Sequence<Row> apply(
@Nullable Sequence<Row> input
Sequence<Row> input
)
{
final ArrayList<Row> materializedList = Sequences.toList(input, Lists.<Row>newArrayList());
@ -347,4 +351,25 @@ public class DefaultLimitSpec implements LimitSpec
result = 31 * result + limit;
return result;
}
@Override
public byte[] getCacheKey()
{
final byte[][] columnBytes = new byte[columns.size()][];
int columnsBytesSize = 0;
int index = 0;
for (OrderByColumnSpec column : columns) {
columnBytes[index] = column.getCacheKey();
columnsBytesSize += columnBytes[index].length;
++index;
}
ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 4)
.put(CACHE_KEY);
for (byte[] columnByte : columnBytes) {
buffer.put(columnByte);
}
buffer.put(Ints.toByteArray(limit));
return buffer.array();
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import io.druid.data.input.Row;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
@ -45,4 +46,6 @@ public interface LimitSpec
);
public LimitSpec merge(LimitSpec other);
public byte[] getCacheKey();
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.metamx.common.guava.Sequence;
import io.druid.data.input.Row;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
@ -33,6 +34,8 @@ import java.util.List;
*/
public class NoopLimitSpec implements LimitSpec
{
private static final byte CACHE_KEY = 0x0;
@Override
public Function<Sequence<Row>, Sequence<Row>> build(
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
@ -63,4 +66,10 @@ public class NoopLimitSpec implements LimitSpec
public int hashCode() {
return 0;
}
@Override
public byte[] getCacheKey()
{
return new byte[]{CACHE_KEY};
}
}

View File

@ -29,6 +29,7 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -37,14 +38,19 @@ import java.util.Map;
*/
public class OrderByColumnSpec
{
public static enum Direction
{
ASCENDING,
DESCENDING
}
/**
* Maintain a map of the enum values so that we can just do a lookup and get a null if it doesn't exist instead
* of an exception thrown.
*/
private static final Map<String, Direction> stupidEnumMap;
static
{
static {
final ImmutableMap.Builder<String, Direction> bob = ImmutableMap.builder();
for (Direction direction : Direction.values()) {
bob.put(direction.toString(), direction);
@ -62,16 +68,14 @@ public class OrderByColumnSpec
if (obj instanceof String) {
return new OrderByColumnSpec(obj.toString(), null);
}
else if (obj instanceof Map) {
} else if (obj instanceof Map) {
final Map map = (Map) obj;
final String dimension = map.get("dimension").toString();
final Direction direction = determineDirection(map.get("direction"));
return new OrderByColumnSpec(dimension, direction);
}
else {
} else {
throw new ISE("Cannot build an OrderByColumnSpec from a %s", obj.getClass());
}
}
@ -176,9 +180,14 @@ public class OrderByColumnSpec
'}';
}
public static enum Direction
public byte[] getCacheKey()
{
ASCENDING,
DESCENDING
final byte[] dimensionBytes = dimension.getBytes();
final byte[] directionBytes = direction.name().getBytes();
return ByteBuffer.allocate(dimensionBytes.length + dimensionBytes.length)
.put(dimensionBytes)
.put(directionBytes)
.array();
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
@ -37,6 +38,7 @@ import io.druid.common.utils.JodaUtils;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner;
@ -60,6 +62,14 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
private final QueryConfig config;
@Inject
public SegmentMetadataQueryQueryToolChest(QueryConfig config)
{
this.config = config;
}
@Override
public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner)
{

View File

@ -55,16 +55,19 @@ import java.util.concurrent.TimeoutException;
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
{
private static final SegmentAnalyzer analyzer = new SegmentAnalyzer();
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest();
private static final Logger log = new Logger(SegmentMetadataQueryRunnerFactory.class);
private final SegmentMetadataQueryQueryToolChest toolChest;
private final QueryWatcher queryWatcher;
@Inject
public SegmentMetadataQueryRunnerFactory(
SegmentMetadataQueryQueryToolChest toolChest,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.queryWatcher = queryWatcher;
}

View File

@ -73,6 +73,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final SearchQueryConfig config;
@Inject

View File

@ -70,6 +70,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
new TypeReference<Result<SelectResultValue>>()
{
};
private final QueryConfig config;
private final ObjectMapper jsonMapper;

View File

@ -21,12 +21,7 @@ package io.druid.query.select;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -41,7 +36,8 @@ public class SelectResultValue implements Iterable<EventHolder>
@JsonCreator
public SelectResultValue(
@JsonProperty("pagingIdentifiers") Map<String, Integer> pagingIdentifiers,
@JsonProperty("events") List<EventHolder> events)
@JsonProperty("events") List<EventHolder> events
)
{
this.pagingIdentifiers = pagingIdentifiers;
this.events = events;

View File

@ -71,6 +71,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
new TypeReference<Result<TimeseriesResultValue>>()
{
};
private final QueryConfig config;
@Inject
@ -173,7 +174,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
return new Function<Result<TimeseriesResultValue>, Object>()
{
@Override
public Object apply(@Nullable final Result<TimeseriesResultValue> input)
public Object apply(final Result<TimeseriesResultValue> input)
{
TimeseriesResultValue results = input.getValue();
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + aggs.size());

View File

@ -19,13 +19,11 @@
package io.druid.query.timeseries;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;

View File

@ -72,6 +72,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final TopNQueryConfig config;
@Inject

View File

@ -32,7 +32,9 @@ import com.google.common.primitives.Floats;
import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
@ -128,6 +130,12 @@ public class SpatialDimensionRowFormatter
return row.getTimestampFromEpoch();
}
@Override
public DateTime getTimestamp()
{
return row.getTimestamp();
}
@Override
public List<String> getDimension(String dimension)
{
@ -157,6 +165,12 @@ public class SpatialDimensionRowFormatter
{
return row.toString();
}
@Override
public int compareTo(Row o)
{
return getTimestamp().compareTo(o.getTimestamp());
}
};
if (!spatialPartialDimNames.isEmpty()) {

View File

@ -19,6 +19,7 @@
package io.druid.query.groupby;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@ -35,6 +36,7 @@ import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -99,41 +101,71 @@ public class GroupByQueryRunnerTest
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
GroupByQueryConfig config = new GroupByQueryConfig();
final ObjectMapper mapper = new DefaultObjectMapper();
final StupidPool<ByteBuffer> pool = new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
);
final GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByQueryEngine engine = new GroupByQueryEngine(
configSupplier,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
);
final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool);
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine)
new GroupByQueryQueryToolChest(configSupplier, mapper, engine)
);
GroupByQueryConfig singleThreadedConfig = new GroupByQueryConfig()
{
@Override
public boolean isSingleThreaded()
{
return true;
}
};
singleThreadedConfig.setMaxIntermediateRows(10000);
final Supplier<GroupByQueryConfig> singleThreadedConfigSupplier = Suppliers.ofInstance(singleThreadedConfig);
final GroupByQueryEngine singleThreadEngine = new GroupByQueryEngine(singleThreadedConfigSupplier, pool);
final GroupByQueryRunnerFactory singleThreadFactory = new GroupByQueryRunnerFactory(
singleThreadEngine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
singleThreadedConfigSupplier,
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine)
);
Function<Object, Object> function = new Function<Object, Object>()
{
@Override
public Object apply(@Nullable Object input)
{
return new Object[]{factory, ((Object[]) input)[0]};
}
};
return Lists.newArrayList(
Iterables.transform(
QueryRunnerTestHelper.makeQueryRunners(factory), new Function<Object, Object>()
{
@Override
public Object apply(@Nullable Object input)
{
return new Object[]{factory, ((Object[]) input)[0]};
}
}
Iterables.concat(
Iterables.transform(
QueryRunnerTestHelper.makeQueryRunners(factory),
function
),
Iterables.transform(
QueryRunnerTestHelper.makeQueryRunners(singleThreadFactory),
function
)
)
);
}
@ -795,7 +827,11 @@ public class GroupByQueryRunnerTest
)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(
configSupplier,
new DefaultObjectMapper(),
engine
).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
}
@ -844,7 +880,11 @@ public class GroupByQueryRunnerTest
)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(
configSupplier,
new DefaultObjectMapper(),
engine
).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
}
@ -893,7 +933,11 @@ public class GroupByQueryRunnerTest
)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(configSupplier, engine).mergeResults(runner);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest(
configSupplier,
new DefaultObjectMapper(),
engine
).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
}
@ -1182,6 +1226,12 @@ public class GroupByQueryRunnerTest
{
return (row.getFloatMetric("idx_subpostagg") < 3800);
}
@Override
public byte[] getCacheKey()
{
return new byte[0];
}
}
)
.addOrderByColumn("alias")
@ -1281,6 +1331,12 @@ public class GroupByQueryRunnerTest
{
return (row.getFloatMetric("idx_subpostagg") < 3800);
}
@Override
public byte[] getCacheKey()
{
return new byte[0];
}
}
)
.addOrderByColumn("alias")
@ -1325,11 +1381,61 @@ public class GroupByQueryRunnerTest
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0, "js_outer_agg", 123.92274475097656),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0, "js_outer_agg", 82.62254333496094),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0, "js_outer_agg", 125.58358001708984),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0, "js_outer_agg", 124.13470458984375),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0, "js_outer_agg", 162.74722290039062)
createExpectedRow(
"2011-04-01",
"alias",
"travel",
"rows",
1L,
"idx",
11119.0,
"js_outer_agg",
123.92274475097656
),
createExpectedRow(
"2011-04-01",
"alias",
"technology",
"rows",
1L,
"idx",
11078.0,
"js_outer_agg",
82.62254333496094
),
createExpectedRow(
"2011-04-01",
"alias",
"news",
"rows",
1L,
"idx",
11121.0,
"js_outer_agg",
125.58358001708984
),
createExpectedRow(
"2011-04-01",
"alias",
"health",
"rows",
1L,
"idx",
11120.0,
"js_outer_agg",
124.13470458984375
),
createExpectedRow(
"2011-04-01",
"alias",
"entertainment",
"rows",
1L,
"idx",
11158.0,
"js_outer_agg",
162.74722290039062
)
);
// Subqueries are handled by the ToolChest
@ -1350,7 +1456,6 @@ public class GroupByQueryRunnerTest
return Sequences.toList(queryResult, Lists.<Row>newArrayList());
}
private Row createExpectedRow(final String timestamp, Object... vals)
{
return createExpectedRow(new DateTime(timestamp), vals);
@ -1365,6 +1470,7 @@ public class GroupByQueryRunnerTest
theVals.put(vals[i].toString(), vals[i + 1]);
}
return new MapBasedRow(new DateTime(timestamp), theVals);
DateTime ts = new DateTime(timestamp);
return new MapBasedRow(ts, theVals);
}
}

View File

@ -28,6 +28,7 @@ import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
@ -74,7 +75,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine)
new GroupByQueryQueryToolChest(configSupplier, new DefaultObjectMapper(), engine)
);
final Collection<?> objects = QueryRunnerTestHelper.makeQueryRunners(factory);
@ -98,13 +99,13 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
return Sequences.map(
groupByRunner.run(
GroupByQuery.builder()
.setDataSource(tsQuery.getDataSource())
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
.setGranularity(tsQuery.getGranularity())
.setDimFilter(tsQuery.getDimensionsFilter())
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.build()
.setDataSource(tsQuery.getDataSource())
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
.setGranularity(tsQuery.getGranularity())
.setDimFilter(tsQuery.getDimensionsFilter())
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.build()
),
new Function<Row, Result<TimeseriesResultValue>>()
{

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Result;
import org.junit.Test;
import java.util.ArrayList;
@ -148,6 +149,12 @@ public class HavingSpecTest
counter.incrementAndGet();
return value;
}
@Override
public byte[] getCacheKey()
{
return new byte[0];
}
}
@Test

View File

@ -22,10 +22,10 @@ package io.druid.query.metadata;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.query.LegacyDataSource;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
@ -67,7 +67,7 @@ public class SegmentAnalyzerTest
final SegmentAnalysis analysis = results.get(0);
Assert.assertEquals("test_1", analysis.getId());
final Map<String,ColumnAnalysis> columns = analysis.getColumns();
final Map<String, ColumnAnalysis> columns = analysis.getColumns();
Assert.assertEquals(TestIndex.COLUMNS.length, columns.size()); // All columns including time
for (String dimension : TestIndex.DIMENSIONS) {
@ -91,12 +91,16 @@ public class SegmentAnalyzerTest
* *Awesome* method name auto-generated by IntelliJ! I love IntelliJ!
*
* @param index
*
* @return
*/
private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
{
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER), index
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new QueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
), index
);
final SegmentMetadataQuery query = new SegmentMetadataQuery(

View File

@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -45,7 +46,9 @@ public class SegmentMetadataQueryTest
{
@SuppressWarnings("unchecked")
private final QueryRunner runner = makeQueryRunner(
new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER)
new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new QueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER)
);
private ObjectMapper mapper = new DefaultObjectMapper();

View File

@ -22,6 +22,7 @@ package io.druid.segment;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.Row;
import io.druid.query.Result;
import org.junit.Assert;
@ -41,7 +42,11 @@ public class TestHelper
assertResults(expectedResults, results, "");
}
public static <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> results, String failMsg)
public static <T> void assertExpectedResults(
Iterable<Result<T>> expectedResults,
Iterable<Result<T>> results,
String failMsg
)
{
assertResults(expectedResults, results, failMsg);
}
@ -56,23 +61,33 @@ public class TestHelper
assertObjects(expectedResults, Sequences.toList(results, Lists.<T>newArrayList()), failMsg);
}
private static <T> void assertResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> actualResults, String failMsg)
private static <T> void assertResults(
Iterable<Result<T>> expectedResults,
Iterable<Result<T>> actualResults,
String failMsg
)
{
Iterator<? extends Result> resultsIter = actualResults.iterator();
Iterator<? extends Result> resultsIter2 = actualResults.iterator();
Iterator<? extends Result> expectedResultsIter = expectedResults.iterator();
while (resultsIter.hasNext() && resultsIter2.hasNext() && expectedResultsIter.hasNext()) {
Result expectedNext = expectedResultsIter.next();
final Result next = resultsIter.next();
final Result next2 = resultsIter2.next();
Object expectedNext = expectedResultsIter.next();
final Object next = resultsIter.next();
final Object next2 = resultsIter2.next();
assertResult(failMsg, expectedNext, next);
assertResult(
String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg),
expectedNext,
next2
);
if (expectedNext instanceof Row) {
// HACK! Special casing for groupBy
Assert.assertEquals(failMsg, expectedNext, next);
Assert.assertEquals(failMsg, expectedNext, next2);
} else {
assertResult(failMsg, (Result) expectedNext, (Result) next);
assertResult(
String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg),
(Result) expectedNext,
(Result) next2
);
}
}
if (resultsIter.hasNext()) {
@ -90,7 +105,9 @@ public class TestHelper
if (expectedResultsIter.hasNext()) {
Assert.fail(
String.format(
"%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
"%s: Expected expectedResultsIter to be exhausted, next element was %s",
failMsg,
expectedResultsIter.next()
)
);
}
@ -130,7 +147,9 @@ public class TestHelper
if (expectedResultsIter.hasNext()) {
Assert.fail(
String.format(
"%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
"%s: Expected expectedResultsIter to be exhausted, next element was %s",
failMsg,
expectedResultsIter.next()
)
);
}

View File

@ -127,9 +127,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final boolean useCache = query.getContextUseCache(true)
&& strategy != null
&& cacheConfig.isUseCache();
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null && cacheConfig.isPopulateCache();
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
final boolean isBySegment = query.getContextBySegment(false);
@ -239,6 +242,15 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
descriptors.add(segment.rhs);
System.out.println(
String.format(
"Server %s has %s_%s_%s",
server.getHost(),
segment.rhs.getInterval(),
segment.rhs.getPartitionNumber(),
segment.rhs.getVersion()
)
);
}
}

View File

@ -78,14 +78,16 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null
&& cacheConfig.isPopulateCache();
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
final boolean useCache = query.getContextUseCache(true)
&& strategy != null
&& cacheConfig.isUseCache();
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final Cache.NamedKey key;
if(strategy != null && (useCache || populateCache)) {
if (strategy != null && (useCache || populateCache)) {
key = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
@ -95,10 +97,10 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
key = null;
}
if(useCache) {
if (useCache) {
final Function cacheFn = strategy.pullFromCache();
final byte[] cachedResult = cache.get(key);
if(cachedResult != null) {
if (cachedResult != null) {
final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz();
return Sequences.map(

View File

@ -20,16 +20,25 @@
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.Query;
import java.util.Arrays;
import java.util.List;
public class CacheConfig
{
public static String USE_CACHE = "useCache";
public static String POPULATE_CACHE = "populateCache";
@JsonProperty
private boolean useCache = true;
@JsonProperty
private boolean populateCache = true;
@JsonProperty
private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT);
public boolean isPopulateCache()
{
return populateCache;
@ -39,4 +48,10 @@ public class CacheConfig
{
return useCache;
}
public boolean isQueryCacheable(Query query)
{
// O(n) impl, but I don't think we'll ever have a million query types here
return !unCacheable.contains(query.getType());
}
}

View File

@ -29,6 +29,7 @@ import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.Omni;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
@ -211,6 +212,12 @@ class WikipediaIrcDecoder implements IrcDecoder
return timestamp.getMillis();
}
@Override
public DateTime getTimestamp()
{
return timestamp;
}
@Override
public List<String> getDimension(String dimension)
{
@ -234,6 +241,12 @@ class WikipediaIrcDecoder implements IrcDecoder
return metrics.get(metric);
}
@Override
public int compareTo(Row o)
{
return timestamp.compareTo(o.getTimestamp());
}
@Override
public String toString()
{

View File

@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
import io.druid.data.input.InputRow;
import java.util.List;
import java.util.Set;
public class LinearShardSpec implements ShardSpec
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -42,6 +44,9 @@ import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.RandomServerSelectorStrategy;
import io.druid.client.selector.ServerSelector;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
@ -56,7 +61,6 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -64,13 +68,23 @@ import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.SearchResultValue;
import io.druid.query.search.search.InsensitiveContainsSearchQuerySpec;
import io.druid.query.search.search.SearchHit;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.select.EventHolder;
import io.druid.query.select.PagingSpec;
import io.druid.query.select.SelectQuery;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.select.SelectResultValue;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest;
@ -105,6 +119,7 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@ -657,19 +672,19 @@ public class CachingClusteredClientTest
@Test
public void testSearchCaching() throws Exception
{
final Druids.SearchQueryBuilder builder = Druids.newSearchQueryBuilder()
.dataSource(DATA_SOURCE)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.limit(1000)
.intervals(SEG_SPEC)
.dimensions(Arrays.asList("a_dim"))
.query("how")
.context(CONTEXT);
testQueryCaching(
client,
new SearchQuery(
new TableDataSource(DATA_SOURCE),
DIM_FILTER,
GRANULARITY,
1000,
SEG_SPEC,
Arrays.asList("a_dim"),
new InsensitiveContainsSearchQuerySpec("how"),
null,
CONTEXT
),
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeSearchResults(new DateTime("2011-01-01"), "how", "howdy", "howwwwww", "howwy"),
@ -694,6 +709,188 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09T01"), "how6", "howdy6", "howwwwww6", "howww6"
)
);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new SearchQueryQueryToolChest(new SearchQueryConfig()));
TestHelper.assertExpectedResults(
makeSearchResults(
new DateTime("2011-01-01"), "how", "howdy", "howwwwww", "howwy",
new DateTime("2011-01-02"), "how1", "howdy1", "howwwwww1", "howwy1",
new DateTime("2011-01-05"), "how2", "howdy2", "howwwwww2", "howww2",
new DateTime("2011-01-05T01"), "how2", "howdy2", "howwwwww2", "howww2",
new DateTime("2011-01-06"), "how3", "howdy3", "howwwwww3", "howww3",
new DateTime("2011-01-06T01"), "how3", "howdy3", "howwwwww3", "howww3",
new DateTime("2011-01-07"), "how4", "howdy4", "howwwwww4", "howww4",
new DateTime("2011-01-07T01"), "how4", "howdy4", "howwwwww4", "howww4",
new DateTime("2011-01-08"), "how5", "howdy5", "howwwwww5", "howww5",
new DateTime("2011-01-08T01"), "how5", "howdy5", "howwwwww5", "howww5",
new DateTime("2011-01-09"), "how6", "howdy6", "howwwwww6", "howww6",
new DateTime("2011-01-09T01"), "how6", "howdy6", "howwwwww6", "howww6"
),
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.build()
)
);
}
@Test
public void testSelectCaching() throws Exception
{
Druids.SelectQueryBuilder builder = Druids.newSelectQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.dimensions(Arrays.asList("a"))
.metrics(Arrays.asList("rows"))
.pagingSpec(new PagingSpec(null, 3))
.context(CONTEXT);
testQueryCaching(
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeSelectResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)),
new Interval("2011-01-02/2011-01-03"),
makeSelectResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5)),
new Interval("2011-01-05/2011-01-10"),
makeSelectResults(
new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5),
new DateTime("2011-01-06"), ImmutableMap.of("a", "e", "rows", 6),
new DateTime("2011-01-07"), ImmutableMap.of("a", "f", "rows", 7),
new DateTime("2011-01-08"), ImmutableMap.of("a", "g", "rows", 8),
new DateTime("2011-01-09"), ImmutableMap.of("a", "h", "rows", 9)
),
new Interval("2011-01-05/2011-01-10"),
makeSelectResults(
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "e", "rows", 6),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "f", "rows", 7),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "g", "rows", 8),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "h", "rows", 9)
)
);
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
new SelectQueryQueryToolChest(
new QueryConfig(),
jsonMapper
)
);
TestHelper.assertExpectedResults(
makeSelectResults(
new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1),
new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5),
new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5),
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5),
new DateTime("2011-01-06"), ImmutableMap.of("a", "e", "rows", 6),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "e", "rows", 6),
new DateTime("2011-01-07"), ImmutableMap.of("a", "f", "rows", 7),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "f", "rows", 7),
new DateTime("2011-01-08"), ImmutableMap.of("a", "g", "rows", 8),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "g", "rows", 8),
new DateTime("2011-01-09"), ImmutableMap.of("a", "h", "rows", 9),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "h", "rows", 9)
),
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.build()
)
);
}
@Test
public void testGroupByCaching() throws Exception
{
GroupByQuery.Builder builder = new GroupByQuery.Builder()
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(SEG_SPEC)
.setDimFilter(DIM_FILTER)
.setGranularity(GRANULARITY)
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "a")))
.setAggregatorSpecs(AGGS)
.setPostAggregatorSpecs(POST_AGGS)
.setContext(CONTEXT);
testQueryCaching(
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "a", "rows", 1, "imps", 1, "impers", 1)),
new Interval("2011-01-02/2011-01-03"),
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "b", "rows", 2, "imps", 2, "impers", 2)),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7)
),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7)
)
);
Supplier<GroupByQueryConfig> configSupplier = new Supplier<GroupByQueryConfig>()
{
@Override
public GroupByQueryConfig get()
{
return new GroupByQueryConfig();
}
};
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
new GroupByQueryQueryToolChest(
configSupplier,
jsonMapper,
new GroupByQueryEngine(
configSupplier,
new StupidPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
)
)
);
TestHelper.assertExpectedObjects(
makeGroupByResults(
new DateTime("2011-01-05T"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06T"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07T"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08T"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09T"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7),
new DateTime("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7)
),
runner.run(
builder.setInterval("2011-01-05/2011-01-10")
.build()
),
""
);
}
@Test
@ -751,7 +948,8 @@ public class CachingClusteredClientTest
final QueryRunner runner,
final int numTimesToQuery,
boolean expectBySegment,
final Query query, Object... args // does this assume query intervals must be ordered?
final Query query,
Object... args // does this assume query intervals must be ordered?
)
{
if (args.length % 2 != 0) {
@ -843,6 +1041,30 @@ public class CachingClusteredClientTest
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once();
} else if (query instanceof SelectQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
List<Iterable<Result<SelectResultValue>>> results = Lists.newArrayList();
for (ServerExpectation expectation : expectations) {
segmentIds.add(expectation.getSegmentId());
intervals.add(expectation.getInterval());
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableSelectResults(segmentIds, intervals, results))
.once();
} else if (query instanceof GroupByQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
List<Iterable<Row>> results = Lists.newArrayList();
for (ServerExpectation expectation : expectations) {
segmentIds.add(expectation.getSegmentId());
intervals.add(expectation.getInterval());
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableGroupByResults(segmentIds, intervals, results))
.once();
} else if (query instanceof TimeBoundaryQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
@ -1097,6 +1319,74 @@ public class CachingClusteredClientTest
);
}
private Sequence<Result<SelectResultValue>> toQueryableSelectResults(
Iterable<String> segmentIds, Iterable<Interval> intervals, Iterable<Iterable<Result<SelectResultValue>>> results
)
{
return Sequences.simple(
FunctionalIterable
.create(segmentIds)
.trinaryTransform(
intervals,
results,
new TrinaryFn<String, Interval, Iterable<Result<SelectResultValue>>, Result<SelectResultValue>>()
{
@Override
@SuppressWarnings("unchecked")
public Result<SelectResultValue> apply(
final String segmentId,
final Interval interval,
final Iterable<Result<SelectResultValue>> results
)
{
return new Result(
results.iterator().next().getTimestamp(),
new BySegmentResultValueClass(
Lists.newArrayList(results),
segmentId,
interval
)
);
}
}
)
);
}
private Sequence<Result> toQueryableGroupByResults(
Iterable<String> segmentIds, Iterable<Interval> intervals, Iterable<Iterable<Row>> results
)
{
return Sequences.simple(
FunctionalIterable
.create(segmentIds)
.trinaryTransform(
intervals,
results,
new TrinaryFn<String, Interval, Iterable<Row>, Result>()
{
@Override
@SuppressWarnings("unchecked")
public Result apply(
final String segmentId,
final Interval interval,
final Iterable<Row> results
)
{
return new Result(
results.iterator().next().getTimestamp(),
new BySegmentResultValueClass(
Lists.newArrayList(results),
segmentId,
interval
)
);
}
}
)
);
}
private Sequence<Result<TimeBoundaryResultValue>> toQueryableTimeBoundaryResults(
Iterable<String> segmentIds,
Iterable<Interval> intervals,
@ -1305,6 +1595,34 @@ public class CachingClusteredClientTest
return retVal;
}
private Iterable<Result<SelectResultValue>> makeSelectResults(Object... objects)
{
List<Result<SelectResultValue>> retVal = Lists.newArrayList();
int index = 0;
while (index < objects.length) {
DateTime timestamp = (DateTime) objects[index++];
List values = Lists.newArrayList();
while (index < objects.length && !(objects[index] instanceof DateTime)) {
values.add(new EventHolder(null, 0, (Map) objects[index++]));
}
retVal.add(new Result<>(timestamp, new SelectResultValue(null, values)));
}
return retVal;
}
private Iterable<Row> makeGroupByResults(Object... objects)
{
List retVal = Lists.newArrayList();
int index = 0;
while (index < objects.length) {
DateTime timestamp = (DateTime) objects[index++];
retVal.add(new MapBasedRow(timestamp, (Map) objects[index++]));
}
return retVal;
}
private <T> T makeMock(List<Object> mocks, Class<T> clazz)
{
T obj = EasyMock.createMock(clazz);
@ -1324,6 +1642,8 @@ public class CachingClusteredClientTest
protected CachingClusteredClient makeClient()
{
final Supplier<GroupByQueryConfig> groupByQueryConfigSupplier = Suppliers.ofInstance(new GroupByQueryConfig());
return new CachingClusteredClient(
new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
@ -1333,6 +1653,30 @@ public class CachingClusteredClientTest
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.put(
SelectQuery.class,
new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper)
)
.put(
GroupByQuery.class,
new GroupByQueryQueryToolChest(
groupByQueryConfigSupplier,
jsonMapper,
new GroupByQueryEngine(
groupByQueryConfigSupplier,
new StupidPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
)
)
)
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())
.build()
),
@ -1364,6 +1708,13 @@ public class CachingClusteredClientTest
cache,
jsonMapper,
new CacheConfig()
{
@Override
public boolean isQueryCacheable(Query query)
{
return true;
}
}
);
}

View File

@ -27,6 +27,7 @@ import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Query;
@ -168,6 +169,12 @@ public class RealtimeManagerTest
return timestamp;
}
@Override
public DateTime getTimestamp()
{
return new DateTime(timestamp);
}
@Override
public List<String> getDimension(String dimension)
{
@ -185,6 +192,12 @@ public class RealtimeManagerTest
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
};
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@ -81,6 +82,12 @@ public class SinkTest
return new DateTime("2013-01-01").getMillis();
}
@Override
public DateTime getTimestamp()
{
return new DateTime("2013-01-01");
}
@Override
public List<String> getDimension(String dimension)
{
@ -98,6 +105,12 @@ public class SinkTest
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
}
);
@ -122,6 +135,12 @@ public class SinkTest
return new DateTime("2013-01-01").getMillis();
}
@Override
public DateTime getTimestamp()
{
return new DateTime("2013-01-01");
}
@Override
public List<String> getDimension(String dimension)
{
@ -139,6 +158,12 @@ public class SinkTest
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
}
);

View File

@ -25,10 +25,12 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.TestUtil;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
import java.util.ArrayList;
@ -176,6 +178,12 @@ public class HashBasedNumberedShardSpecTest
return 0;
}
@Override
public DateTime getTimestamp()
{
return new DateTime(0);
}
@Override
public List<String> getDimension(String s)
{
@ -193,5 +201,11 @@ public class HashBasedNumberedShardSpecTest
{
return 0;
}
@Override
public int compareTo(Row o)
{
return 0;
}
}
}