mirror of https://github.com/apache/druid.git
Add some javadoc to the two Query processing interfaces to help aid in implementations of new Queries.
Also, remove some comments that did not have enough context to actually make sense to anyone but the original author (at least, I hope they make sense to the author, I definitely don't know what was being said).
This commit is contained in:
parent
700dc6fbc0
commit
7517f0d0f0
|
@ -22,10 +22,45 @@ import io.druid.segment.Segment;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* An interface that defines the nitty gritty implementation detauls of a Query on a Segment
|
||||||
*/
|
*/
|
||||||
public interface QueryRunnerFactory<T, QueryType extends Query<T>>
|
public interface QueryRunnerFactory<T, QueryType extends Query<T>>
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Given a specific segment, this method will create a QueryRunner.
|
||||||
|
*
|
||||||
|
* The QueryRunner, when asked, will generate a Sequence of results based on the given segment. This
|
||||||
|
* is the meat of the query processing and is where the results are actually generated. Everything else
|
||||||
|
* is just merging and reduction logic.
|
||||||
|
*
|
||||||
|
* @param segment The segment to process
|
||||||
|
* @return A QueryRunner that, when asked, will generate a Sequence of results based on the given segment
|
||||||
|
*/
|
||||||
public QueryRunner<T> createRunner(Segment segment);
|
public QueryRunner<T> createRunner(Segment segment);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed
|
||||||
|
* along to this method with an ExecutorService. The method should then return a QueryRunner that, when
|
||||||
|
* asked, will use the ExecutorService to run the base QueryRunners in some fashion.
|
||||||
|
*
|
||||||
|
* The vast majority of the time, this should be implemented with
|
||||||
|
*
|
||||||
|
* return new ChainedExecutionQueryRunner<>(
|
||||||
|
* queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
|
||||||
|
* );
|
||||||
|
*
|
||||||
|
* Which will allow for parallel execution up to the maximum number of processing threads allowed.
|
||||||
|
*
|
||||||
|
* @param queryExecutor ExecutorService to be used for parallel processing
|
||||||
|
* @param queryRunners Individual QueryRunner objects that produce some results
|
||||||
|
* @return a QueryRunner that, when asked, will use the ExecutorService to runt he base QueryRunners
|
||||||
|
*/
|
||||||
public QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners);
|
public QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides access to the toolchest for this specific query type.
|
||||||
|
*
|
||||||
|
* @return an instance of the toolchest for this specific query type.
|
||||||
|
*/
|
||||||
public QueryToolChest<T, QueryType> getToolchest();
|
public QueryToolChest<T, QueryType> getToolchest();
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,53 +33,161 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultType>>
|
public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultType>>
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* This method wraps a QueryRunner. The input QueryRunner, by contract, will provide a series of
|
||||||
|
* ResultType objects in time order (ascending). This method should return a new QueryRunner that
|
||||||
|
* potentially merges the stream of ordered ResultType objects.
|
||||||
|
*
|
||||||
|
* @param runner A QueryRunner that provides a series of ResultType objects in time order (ascending)
|
||||||
|
* @return a QueryRunner that potentialy merges the stream of ordered ResultType objects
|
||||||
|
*/
|
||||||
public abstract QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner);
|
public abstract QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method doesn't belong here, but it's here for now just to make it work.
|
* This method doesn't belong here, but it's here for now just to make it work. The method needs to
|
||||||
|
* take a Sequence of Sequences and return a single Sequence of ResultType objects in time-order (ascending)
|
||||||
|
*
|
||||||
|
* This method assumes that its input sequences provide values already in sorted order.
|
||||||
|
* Even more specifically, it assumes that the individual sequences are also ordered by their first element.
|
||||||
|
*
|
||||||
|
* In the vast majority of cases, this should just be implemented with:
|
||||||
|
*
|
||||||
|
* return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
*
|
*
|
||||||
* @param seqOfSequences sequence of sequences to be merged
|
* @param seqOfSequences sequence of sequences to be merged
|
||||||
*
|
|
||||||
* @return the sequence of merged results
|
* @return the sequence of merged results
|
||||||
*/
|
*/
|
||||||
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
|
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method doesn't belong here, but it's here for now just to make it work. The method needs to
|
||||||
|
* take a Sequence of Sequences and return a single Sequence of ResultType objects in time-order (ascending)
|
||||||
|
*
|
||||||
|
* This method assumes that its input sequences provide values already in sorted order, but, unlike
|
||||||
|
* mergeSequences, it does *not* assume that the individual sequences are also ordered by their first element.
|
||||||
|
*
|
||||||
|
* In the vast majority if ocases, this hsould just be implemented with:
|
||||||
|
*
|
||||||
|
* return new MergeSequence<>(getOrdering(), seqOfSequences);
|
||||||
|
*
|
||||||
|
* @param seqOfSequences sequence of sequences to be merged
|
||||||
|
* @return the sequence of merged results
|
||||||
|
*/
|
||||||
public abstract Sequence<ResultType> mergeSequencesUnordered(Sequence<Sequence<ResultType>> seqOfSequences);
|
public abstract Sequence<ResultType> mergeSequencesUnordered(Sequence<Sequence<ResultType>> seqOfSequences);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a builder that is used to generate a metric for this specific query type. This exists
|
||||||
|
* to allow for query-specific dimensions on metrics. That is, the ToolChest is expected to set some
|
||||||
|
* meaningful dimensions for metrics given this query type. Examples might be the topN threshhold for
|
||||||
|
* a TopN query or the number of dimensions included for a groupBy query.
|
||||||
|
*
|
||||||
|
* @param query The query that is being processed
|
||||||
|
* @return A MetricEvent.Builder that can be used to make metrics for the provided query
|
||||||
|
*/
|
||||||
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
|
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a Function that can take in a ResultType and return a new ResultType having applied
|
||||||
|
* the MetricManipulatorFn to each of the metrics.
|
||||||
|
*
|
||||||
|
* This exists because the QueryToolChest is the only thing that understands the internal serialization
|
||||||
|
* format of ResultType, so it's primary responsibility is to "decompose" that structure and apply the
|
||||||
|
* given function to all metrics.
|
||||||
|
*
|
||||||
|
* This function is called very early in the processing pipeline on the Broker.
|
||||||
|
*
|
||||||
|
* @param query The Query that is currently being processed
|
||||||
|
* @param fn The function that should be applied to all metrics in the results
|
||||||
|
* @return A function that will apply the provided fn to all metrics in the input ResultType object
|
||||||
|
*/
|
||||||
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(
|
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(
|
||||||
QueryType query,
|
QueryType query,
|
||||||
MetricManipulationFn fn
|
MetricManipulationFn fn
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generally speaking this is the exact same thing as makePreComputeManipulatorFn. It is leveraged in
|
||||||
|
* order to compute PostAggregators on results after they have been completely merged together, which
|
||||||
|
* should actually be done in the mergeResults() call instead of here.
|
||||||
|
*
|
||||||
|
* This should never actually be overridden and it should be removed as quickly as possible.
|
||||||
|
*
|
||||||
|
* @param query The Query that is currently being processed
|
||||||
|
* @param fn The function that should be applied to all metrics in the results
|
||||||
|
* @return A function that will apply the provided fn to all metrics in the input ResultType object
|
||||||
|
*/
|
||||||
public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn)
|
public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn)
|
||||||
{
|
{
|
||||||
return makePreComputeManipulatorFn(query, fn);
|
return makePreComputeManipulatorFn(query, fn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a TypeReference object that is just passed through to Jackson in order to deserialize
|
||||||
|
* the results of this type of query.
|
||||||
|
*
|
||||||
|
* @return A TypeReference to indicate to Jackson what type of data will exist for this query
|
||||||
|
*/
|
||||||
public abstract TypeReference<ResultType> getResultTypeReference();
|
public abstract TypeReference<ResultType> getResultTypeReference();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a CacheStrategy to be used to load data into the cache and remove it from the cache.
|
||||||
|
*
|
||||||
|
* This is optional. If it returns null, caching is effectively disabled for the query.
|
||||||
|
*
|
||||||
|
* @param query The query whose results might be cached
|
||||||
|
* @param <T> The type of object that will be stored in the cache
|
||||||
|
* @return A CacheStrategy that can be used to populate and read from the Cache
|
||||||
|
*/
|
||||||
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
|
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
|
||||||
{
|
{
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists *before* being passed to
|
||||||
|
* mergeResults().
|
||||||
|
*
|
||||||
|
* In fact, the return value of this method is always passed to mergeResults, so it is equivalent to
|
||||||
|
* just implement this functionality as extra decoration on the QueryRunner during mergeResults().
|
||||||
|
*
|
||||||
|
* In the interests of potentially simplifying these interfaces, the recommendation is to actually not
|
||||||
|
* override this method and instead apply anything that might be needed here in the mergeResults() call.
|
||||||
|
*
|
||||||
|
* @param runner The runner to be wrapped
|
||||||
|
* @return The wrapped runner
|
||||||
|
*/
|
||||||
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner)
|
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner)
|
||||||
{
|
{
|
||||||
return runner;
|
return runner;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists coming out of mergeResults()
|
||||||
|
*
|
||||||
|
* In fact, the input value of this method is always the return value from mergeResults, so it is equivalent
|
||||||
|
* to just implement this functionality as extra decoration on the QueryRunner during mergeResults().
|
||||||
|
*
|
||||||
|
* In the interests of potentially simplifying these interfaces, the recommendation is to actually not
|
||||||
|
* override this method and instead apply anything that might be needed here in the mergeResults() call.
|
||||||
|
*
|
||||||
|
* @param runner The runner to be wrapped
|
||||||
|
* @return The wrapped runner
|
||||||
|
*/
|
||||||
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner)
|
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner)
|
||||||
{
|
{
|
||||||
return runner;
|
return runner;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param query
|
* This method is called to allow the query to prune segments that it does not believe need to actually
|
||||||
* @param segments list of segments sorted by segment intervals.
|
* be queried. It can use whatever criteria it wants in order to do the pruning, it just needs to
|
||||||
* @return list of segments to be queried in order to determine query results.
|
* return the list of Segments it actually wants to see queried.
|
||||||
|
*
|
||||||
|
* @param query The query being processed
|
||||||
|
* @param segments The list of candidate segments to be queried
|
||||||
|
* @param <T> A Generic parameter because Java is cool
|
||||||
|
* @return The list of segments to actually query
|
||||||
*/
|
*/
|
||||||
public <T extends LogicalSegment> List<T> filterSegments(QueryType query, List<T> segments)
|
public <T extends LogicalSegment> List<T> filterSegments(QueryType query, List<T> segments)
|
||||||
{
|
{
|
||||||
|
|
|
@ -193,7 +193,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
||||||
+ prunedAggs.size()
|
+ prunedAggs.size()
|
||||||
+ 1
|
+ 1
|
||||||
);
|
);
|
||||||
// JVM couldn't optimize this too well, so this is helping it out a bit.
|
|
||||||
for (int i = 0; i < aggregatorFactories.length; ++i) {
|
for (int i = 0; i < aggregatorFactories.length; ++i) {
|
||||||
final String aggName = aggFactoryNames[i];
|
final String aggName = aggFactoryNames[i];
|
||||||
values.put(aggName, fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
|
values.put(aggName, fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
|
||||||
|
@ -253,8 +253,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
||||||
+ query.getPostAggregatorSpecs().size()
|
+ query.getPostAggregatorSpecs().size()
|
||||||
+ 1
|
+ 1
|
||||||
);
|
);
|
||||||
// put non finalized aggregators for calculating dependent post Aggregators
|
|
||||||
// JVM is dumb about optimization
|
|
||||||
for( int i = 0; i < aggFactoryNames.length; ++i){
|
for( int i = 0; i < aggFactoryNames.length; ++i){
|
||||||
final String name = aggFactoryNames[i];
|
final String name = aggFactoryNames[i];
|
||||||
values.put(name, input.getMetric(name));
|
values.put(name, input.getMetric(name));
|
||||||
|
|
Loading…
Reference in New Issue