mirror of
https://github.com/apache/druid.git
synced 2025-02-09 03:24:55 +00:00
refactor ClientQuerySegmentWalker (#2837)
* refactor ClientQuerySegmentWalker * add header to FluentQueryRunnerBuilder * refactor QueryRunnerTestHelper
This commit is contained in:
parent
838768c632
commit
7b65ca7889
@ -0,0 +1,124 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.query;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.metamx.common.guava.Sequence;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
public class FluentQueryRunnerBuilder<T>
|
||||||
|
{
|
||||||
|
final QueryToolChest<T, Query<T>> toolChest;
|
||||||
|
|
||||||
|
public FluentQueryRunner create(QueryRunner<T> baseRunner) {
|
||||||
|
return new FluentQueryRunner(baseRunner);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FluentQueryRunnerBuilder(QueryToolChest<T, Query<T>> toolChest)
|
||||||
|
{
|
||||||
|
this.toolChest = toolChest;
|
||||||
|
}
|
||||||
|
|
||||||
|
public class FluentQueryRunner implements QueryRunner<T>
|
||||||
|
{
|
||||||
|
private QueryRunner<T> baseRunner;
|
||||||
|
|
||||||
|
public FluentQueryRunner(QueryRunner<T> runner)
|
||||||
|
{
|
||||||
|
this.baseRunner = runner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<T> run(
|
||||||
|
Query<T> query, Map<String, Object> responseContext
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return baseRunner.run(query, responseContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FluentQueryRunner from(QueryRunner<T> runner) {
|
||||||
|
return new FluentQueryRunner(runner);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FluentQueryRunner applyPostMergeDecoration()
|
||||||
|
{
|
||||||
|
return from(
|
||||||
|
new FinalizeResultsQueryRunner<T>(
|
||||||
|
toolChest.postMergeQueryDecoration(
|
||||||
|
baseRunner
|
||||||
|
),
|
||||||
|
toolChest
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FluentQueryRunner applyPreMergeDecoration()
|
||||||
|
{
|
||||||
|
return from(
|
||||||
|
new UnionQueryRunner<T>(
|
||||||
|
toolChest.preMergeQueryDecoration(
|
||||||
|
baseRunner
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter)
|
||||||
|
{
|
||||||
|
return from(
|
||||||
|
CPUTimeMetricQueryRunner.safeBuild(
|
||||||
|
baseRunner,
|
||||||
|
new Function<Query<T>, ServiceMetricEvent.Builder>()
|
||||||
|
{
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public ServiceMetricEvent.Builder apply(Query<T> tQuery)
|
||||||
|
{
|
||||||
|
return toolChest.makeMetricBuilder(tQuery);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emitter,
|
||||||
|
new AtomicLong(0L),
|
||||||
|
true
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FluentQueryRunner postProcess(PostProcessingOperator<T> postProcessing)
|
||||||
|
{
|
||||||
|
return from(
|
||||||
|
postProcessing != null ?
|
||||||
|
postProcessing.postProcess(baseRunner) : baseRunner
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FluentQueryRunner mergeResults()
|
||||||
|
{
|
||||||
|
return from(
|
||||||
|
toolChest.mergeResults(baseRunner)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -442,19 +442,17 @@ public class QueryRunnerTestHelper
|
|||||||
Segment adapter
|
Segment adapter
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return new FinalizeResultsQueryRunner<T>(
|
return new FluentQueryRunnerBuilder<T>(factory.getToolchest())
|
||||||
factory.getToolchest().postMergeQueryDecoration(
|
.create(
|
||||||
factory.getToolchest().mergeResults(
|
new UnionQueryRunner<T>(
|
||||||
new UnionQueryRunner<T>(
|
new BySegmentQueryRunner<T>(
|
||||||
new BySegmentQueryRunner<T>(
|
segmentId, adapter.getDataInterval().getStart(),
|
||||||
segmentId, adapter.getDataInterval().getStart(),
|
factory.createRunner(adapter)
|
||||||
factory.createRunner(adapter)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
),
|
)
|
||||||
factory.getToolchest()
|
.mergeResults()
|
||||||
);
|
.applyPostMergeDecoration();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> QueryRunner<T> makeFilteringQueryRunner(
|
public static <T> QueryRunner<T> makeFilteringQueryRunner(
|
||||||
@ -462,41 +460,38 @@ public class QueryRunnerTestHelper
|
|||||||
final QueryRunnerFactory<T, Query<T>> factory) {
|
final QueryRunnerFactory<T, Query<T>> factory) {
|
||||||
|
|
||||||
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
|
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
|
||||||
return new FinalizeResultsQueryRunner(
|
return new FluentQueryRunnerBuilder<T>(toolChest)
|
||||||
toolChest.postMergeQueryDecoration(
|
.create(
|
||||||
toolChest.mergeResults(
|
new QueryRunner<T>()
|
||||||
toolChest.preMergeQueryDecoration(
|
{
|
||||||
new QueryRunner<T>()
|
@Override
|
||||||
{
|
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
|
||||||
@Override
|
{
|
||||||
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
|
List<TimelineObjectHolder> segments = Lists.newArrayList();
|
||||||
{
|
for (Interval interval : query.getIntervals()) {
|
||||||
List<TimelineObjectHolder> segments = Lists.newArrayList();
|
segments.addAll(timeline.lookup(interval));
|
||||||
for (Interval interval : query.getIntervals()) {
|
}
|
||||||
segments.addAll(timeline.lookup(interval));
|
List<Sequence<T>> sequences = Lists.newArrayList();
|
||||||
}
|
for (TimelineObjectHolder<String, Segment> holder : toolChest.filterSegments(query, segments)) {
|
||||||
List<Sequence<T>> sequences = Lists.newArrayList();
|
Segment segment = holder.getObject().getChunk(0).getObject();
|
||||||
for (TimelineObjectHolder<String, Segment> holder : toolChest.filterSegments(query, segments)) {
|
Query running = query.withQuerySegmentSpec(
|
||||||
Segment segment = holder.getObject().getChunk(0).getObject();
|
new SpecificSegmentSpec(
|
||||||
Query running = query.withQuerySegmentSpec(
|
new SegmentDescriptor(
|
||||||
new SpecificSegmentSpec(
|
holder.getInterval(),
|
||||||
new SegmentDescriptor(
|
holder.getVersion(),
|
||||||
holder.getInterval(),
|
0
|
||||||
holder.getVersion(),
|
)
|
||||||
0
|
)
|
||||||
)
|
);
|
||||||
)
|
sequences.add(factory.createRunner(segment).run(running, responseContext));
|
||||||
);
|
}
|
||||||
sequences.add(factory.createRunner(segment).run(running, responseContext));
|
return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences));
|
||||||
}
|
}
|
||||||
return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences));
|
}
|
||||||
}
|
)
|
||||||
}
|
.applyPreMergeDecoration()
|
||||||
)
|
.mergeResults()
|
||||||
)
|
.applyPostMergeDecoration();
|
||||||
),
|
|
||||||
toolChest
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
|
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
|
||||||
|
@ -21,13 +21,10 @@ package io.druid.server;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
|
||||||
import io.druid.client.CachingClusteredClient;
|
import io.druid.client.CachingClusteredClient;
|
||||||
import io.druid.query.CPUTimeMetricQueryRunner;
|
import io.druid.query.FluentQueryRunnerBuilder;
|
||||||
import io.druid.query.FinalizeResultsQueryRunner;
|
|
||||||
import io.druid.query.PostProcessingOperator;
|
import io.druid.query.PostProcessingOperator;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
@ -37,13 +34,8 @@ import io.druid.query.QueryToolChestWarehouse;
|
|||||||
import io.druid.query.RetryQueryRunner;
|
import io.druid.query.RetryQueryRunner;
|
||||||
import io.druid.query.RetryQueryRunnerConfig;
|
import io.druid.query.RetryQueryRunnerConfig;
|
||||||
import io.druid.query.SegmentDescriptor;
|
import io.druid.query.SegmentDescriptor;
|
||||||
import io.druid.query.UnionQueryRunner;
|
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
@ -82,53 +74,31 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||||||
return makeRunner(query);
|
return makeRunner(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> QueryRunner<T> makeRunner(final Query<T> query)
|
private <T> QueryRunner<T> makeRunner(Query<T> query)
|
||||||
{
|
{
|
||||||
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
|
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
|
||||||
final QueryRunner<T> baseRunner = CPUTimeMetricQueryRunner.safeBuild(
|
PostProcessingOperator<T> postProcessing = objectMapper.convertValue(
|
||||||
new FinalizeResultsQueryRunner<T>(
|
query.<String>getContextValue("postProcessing"),
|
||||||
toolChest.postMergeQueryDecoration(
|
new TypeReference<PostProcessingOperator<T>>()
|
||||||
toolChest.mergeResults(
|
|
||||||
new UnionQueryRunner<T>(
|
|
||||||
toolChest.preMergeQueryDecoration(
|
|
||||||
new RetryQueryRunner<T>(
|
|
||||||
baseClient,
|
|
||||||
toolChest,
|
|
||||||
retryConfig,
|
|
||||||
objectMapper
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
),
|
|
||||||
toolChest
|
|
||||||
),
|
|
||||||
new Function<Query<T>, ServiceMetricEvent.Builder>()
|
|
||||||
{
|
{
|
||||||
@Nullable
|
}
|
||||||
@Override
|
|
||||||
public ServiceMetricEvent.Builder apply(Query<T> tQuery)
|
|
||||||
{
|
|
||||||
return toolChest.makeMetricBuilder(tQuery);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
emitter,
|
|
||||||
new AtomicLong(0L),
|
|
||||||
true
|
|
||||||
);
|
);
|
||||||
|
|
||||||
final Map<String, Object> context = query.getContext();
|
return new FluentQueryRunnerBuilder<>(toolChest)
|
||||||
PostProcessingOperator<T> postProcessing = null;
|
.create(
|
||||||
if (context != null) {
|
new RetryQueryRunner<>(
|
||||||
postProcessing = objectMapper.convertValue(
|
baseClient,
|
||||||
context.get("postProcessing"),
|
toolChest,
|
||||||
new TypeReference<PostProcessingOperator<T>>()
|
retryConfig,
|
||||||
{
|
objectMapper
|
||||||
}
|
)
|
||||||
);
|
)
|
||||||
}
|
.applyPreMergeDecoration()
|
||||||
|
.mergeResults()
|
||||||
return postProcessing != null ?
|
.applyPostMergeDecoration()
|
||||||
postProcessing.postProcess(baseRunner) : baseRunner;
|
.emitCPUTimeMetric(emitter)
|
||||||
|
.postProcess(postProcessing);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user