From 55ebf0cfdf5c290a462f188d71c806b595eec689 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 4 Mar 2015 18:12:31 -0600 Subject: [PATCH] force eager the processing of segment metadata query on the processing threadpool by using ChainedExecutionQueryRunner in SegmentMetadataQueryRunnerFactory.mergeRunners(..) --- .../SegmentMetadataQueryQueryToolChest.java | 2 +- .../SegmentMetadataQueryRunnerFactory.java | 69 +------------------ 2 files changed, 4 insertions(+), 67 deletions(-) diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 68f6dd021a8..779eeb0dfc2 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -216,7 +216,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest getOrdering() + public Ordering getOrdering() { return new Ordering() { diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 0efebdea386..8974f90d99b 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -17,20 +17,13 @@ package io.druid.query.metadata; -import com.google.common.base.Function; -import com.google.common.base.Throwables; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; -import io.druid.query.AbstractPrioritizedCallable; -import io.druid.query.ConcatQueryRunner; +import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -44,11 +37,7 @@ import io.druid.segment.Segment; import java.util.Arrays; import java.util.Map; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory { @@ -122,60 +111,8 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory> queryRunners ) { - final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec); - return new ConcatQueryRunner( - Sequences.map( - Sequences.simple(queryRunners), - new Function, QueryRunner>() - { - @Override - public QueryRunner apply(final QueryRunner input) - { - return new QueryRunner() - { - @Override - public Sequence run( - final Query query, - final Map responseContext - ) - { - final int priority = query.getContextPriority(0); - ListenableFuture> future = queryExecutor.submit( - new AbstractPrioritizedCallable>(priority) - { - @Override - public Sequence call() throws Exception - { - return input.run(query, responseContext); - } - } - ); - try { - queryWatcher.registerQuery(query, future); - final Number timeout = query.getContextValue("timeout", (Number) null); - return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); - future.cancel(true); - throw new QueryInterruptedException("Query interrupted"); - } - catch(CancellationException e) { - throw new QueryInterruptedException("Query cancelled"); - } - catch(TimeoutException e) { - log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); - future.cancel(true); - throw new QueryInterruptedException("Query timeout"); - } - catch (ExecutionException e) { - throw Throwables.propagate(e.getCause()); - } - } - }; - } - } - ) + return new ChainedExecutionQueryRunner<>( + exec, toolChest.getOrdering(), queryWatcher, queryRunners ); }