From 7b65ca78893576f7d4f50ab9c87aad922cc739f5 Mon Sep 17 00:00:00 2001 From: Jisoo Kim Date: Mon, 18 Apr 2016 14:00:47 -0700 Subject: [PATCH] refactor ClientQuerySegmentWalker (#2837) * refactor ClientQuerySegmentWalker * add header to FluentQueryRunnerBuilder * refactor QueryRunnerTestHelper --- .../druid/query/FluentQueryRunnerBuilder.java | 124 ++++++++++++++++++ .../io/druid/query/QueryRunnerTestHelper.java | 87 ++++++------ .../server/ClientQuerySegmentWalker.java | 76 ++++------- 3 files changed, 188 insertions(+), 99 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java diff --git a/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java b/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java new file mode 100644 index 00000000000..33aa917b551 --- /dev/null +++ b/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java @@ -0,0 +1,124 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.base.Function; +import com.metamx.common.guava.Sequence; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class FluentQueryRunnerBuilder +{ + final QueryToolChest> toolChest; + + public FluentQueryRunner create(QueryRunner baseRunner) { + return new FluentQueryRunner(baseRunner); + } + + public FluentQueryRunnerBuilder(QueryToolChest> toolChest) + { + this.toolChest = toolChest; + } + + public class FluentQueryRunner implements QueryRunner + { + private QueryRunner baseRunner; + + public FluentQueryRunner(QueryRunner runner) + { + this.baseRunner = runner; + } + + @Override + public Sequence run( + Query query, Map responseContext + ) + { + return baseRunner.run(query, responseContext); + } + + public FluentQueryRunner from(QueryRunner runner) { + return new FluentQueryRunner(runner); + } + + public FluentQueryRunner applyPostMergeDecoration() + { + return from( + new FinalizeResultsQueryRunner( + toolChest.postMergeQueryDecoration( + baseRunner + ), + toolChest + ) + ); + } + + public FluentQueryRunner applyPreMergeDecoration() + { + return from( + new UnionQueryRunner( + toolChest.preMergeQueryDecoration( + baseRunner + ) + ) + ); + } + + public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter) + { + return from( + CPUTimeMetricQueryRunner.safeBuild( + baseRunner, + new Function, ServiceMetricEvent.Builder>() + { + @Nullable + @Override + public ServiceMetricEvent.Builder apply(Query tQuery) + { + return toolChest.makeMetricBuilder(tQuery); + } + }, + emitter, + new AtomicLong(0L), + true + ) + ); + } + + public FluentQueryRunner postProcess(PostProcessingOperator postProcessing) + { + return from( + postProcessing != null ? + postProcessing.postProcess(baseRunner) : baseRunner + ); + } + + public FluentQueryRunner mergeResults() + { + return from( + toolChest.mergeResults(baseRunner) + ); + } + } +} diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 53f0d2c7a9d..3a02e2f0b5f 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -442,19 +442,17 @@ public class QueryRunnerTestHelper Segment adapter ) { - return new FinalizeResultsQueryRunner( - factory.getToolchest().postMergeQueryDecoration( - factory.getToolchest().mergeResults( - new UnionQueryRunner( - new BySegmentQueryRunner( - segmentId, adapter.getDataInterval().getStart(), - factory.createRunner(adapter) - ) + return new FluentQueryRunnerBuilder(factory.getToolchest()) + .create( + new UnionQueryRunner( + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) ) ) - ), - factory.getToolchest() - ); + ) + .mergeResults() + .applyPostMergeDecoration(); } public static QueryRunner makeFilteringQueryRunner( @@ -462,41 +460,38 @@ public class QueryRunnerTestHelper final QueryRunnerFactory> factory) { final QueryToolChest> toolChest = factory.getToolchest(); - return new FinalizeResultsQueryRunner( - toolChest.postMergeQueryDecoration( - toolChest.mergeResults( - toolChest.preMergeQueryDecoration( - new QueryRunner() - { - @Override - public Sequence run(Query query, Map responseContext) - { - List segments = Lists.newArrayList(); - for (Interval interval : query.getIntervals()) { - segments.addAll(timeline.lookup(interval)); - } - List> sequences = Lists.newArrayList(); - for (TimelineObjectHolder holder : toolChest.filterSegments(query, segments)) { - Segment segment = holder.getObject().getChunk(0).getObject(); - Query running = query.withQuerySegmentSpec( - new SpecificSegmentSpec( - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - 0 - ) - ) - ); - sequences.add(factory.createRunner(segment).run(running, responseContext)); - } - return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences)); - } - } - ) - ) - ), - toolChest - ); + return new FluentQueryRunnerBuilder(toolChest) + .create( + new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + List segments = Lists.newArrayList(); + for (Interval interval : query.getIntervals()) { + segments.addAll(timeline.lookup(interval)); + } + List> sequences = Lists.newArrayList(); + for (TimelineObjectHolder holder : toolChest.filterSegments(query, segments)) { + Segment segment = holder.getObject().getChunk(0).getObject(); + Query running = query.withQuerySegmentSpec( + new SpecificSegmentSpec( + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + 0 + ) + ) + ); + sequences.add(factory.createRunner(segment).run(running, responseContext)); + } + return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences)); + } + } + ) + .applyPreMergeDecoration() + .mergeResults() + .applyPostMergeDecoration(); } public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index c8325ca07dd..3af0014439b 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -21,13 +21,10 @@ package io.druid.server; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingClusteredClient; -import io.druid.query.CPUTimeMetricQueryRunner; -import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.FluentQueryRunnerBuilder; import io.druid.query.PostProcessingOperator; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -37,13 +34,8 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; -import io.druid.query.UnionQueryRunner; import org.joda.time.Interval; -import javax.annotation.Nullable; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - /** */ public class ClientQuerySegmentWalker implements QuerySegmentWalker @@ -82,53 +74,31 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker return makeRunner(query); } - private QueryRunner makeRunner(final Query query) + private QueryRunner makeRunner(Query query) { - final QueryToolChest> toolChest = warehouse.getToolChest(query); - final QueryRunner baseRunner = CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner( - toolChest.postMergeQueryDecoration( - toolChest.mergeResults( - new UnionQueryRunner( - toolChest.preMergeQueryDecoration( - new RetryQueryRunner( - baseClient, - toolChest, - retryConfig, - objectMapper - ) - ) - ) - ) - ), - toolChest - ), - new Function, ServiceMetricEvent.Builder>() + QueryToolChest> toolChest = warehouse.getToolChest(query); + PostProcessingOperator postProcessing = objectMapper.convertValue( + query.getContextValue("postProcessing"), + new TypeReference>() { - @Nullable - @Override - public ServiceMetricEvent.Builder apply(Query tQuery) - { - return toolChest.makeMetricBuilder(tQuery); - } - }, - emitter, - new AtomicLong(0L), - true + } ); - final Map context = query.getContext(); - PostProcessingOperator postProcessing = null; - if (context != null) { - postProcessing = objectMapper.convertValue( - context.get("postProcessing"), - new TypeReference>() - { - } - ); - } - - return postProcessing != null ? - postProcessing.postProcess(baseRunner) : baseRunner; + return new FluentQueryRunnerBuilder<>(toolChest) + .create( + new RetryQueryRunner<>( + baseClient, + toolChest, + retryConfig, + objectMapper + ) + ) + .applyPreMergeDecoration() + .mergeResults() + .applyPostMergeDecoration() + .emitCPUTimeMetric(emitter) + .postProcess(postProcessing); } + + }