From da0cc32bc84d85ae359601486780e91301872dda Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 21 May 2015 21:13:45 -0500 Subject: [PATCH] Revert commit 55ebf0cfdf5c290a462f188d71c806b595eec689 which caused following regression it fails when we issue the SegmentMetadataQuery by setting {"bySegment" : true} in context with exception - java.lang.ClassCastException: io.druid.query.Result cannot be cast to io.druid.query.metadata.metadata.SegmentAnalysis at io.druid.query.metadata.SegmentMetadataQueryQueryToolChest$4.compare(SegmentMetadataQueryQueryToolChest.java:222) ~[druid-processing-0.7.3-SNAPSHOT.jar:0.7.3-SNAPSHOT] at com.google.common.collect.NullsFirstOrdering.compare(NullsFirstOrdering.java:44) ~[guava-16.0.1.jar:?] at com.metamx.common.guava.MergeIterator$1.compare(MergeIterator.java:46) ~[java-util-0.27.0.jar:?] at com.metamx.common.guava.MergeIterator$1.compare(MergeIterator.java:42) ~[java-util-0.27.0.jar:?] at java.util.PriorityQueue.siftUpUsingComparator(PriorityQueue.java:649) ~[?:1.7.0_80] --- .../SegmentMetadataQueryQueryToolChest.java | 2 +- .../SegmentMetadataQueryRunnerFactory.java | 69 ++++++++++++++++++- 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 1630405f4e7..10971d8a325 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -216,7 +216,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest getOrdering() + private Ordering getOrdering() { return new Ordering() { diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 8974f90d99b..0efebdea386 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -17,13 +17,20 @@ package io.druid.query.metadata; +import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; -import io.druid.query.ChainedExecutionQueryRunner; +import io.druid.query.AbstractPrioritizedCallable; +import io.druid.query.ConcatQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -37,7 +44,11 @@ import io.druid.segment.Segment; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory { @@ -111,8 +122,60 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory> queryRunners ) { - return new ChainedExecutionQueryRunner<>( - exec, toolChest.getOrdering(), queryWatcher, queryRunners + final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec); + return new ConcatQueryRunner( + Sequences.map( + Sequences.simple(queryRunners), + new Function, QueryRunner>() + { + @Override + public QueryRunner apply(final QueryRunner input) + { + return new QueryRunner() + { + @Override + public Sequence run( + final Query query, + final Map responseContext + ) + { + final int priority = query.getContextPriority(0); + ListenableFuture> future = queryExecutor.submit( + new AbstractPrioritizedCallable>(priority) + { + @Override + public Sequence call() throws Exception + { + return input.run(query, responseContext); + } + } + ); + try { + queryWatcher.registerQuery(query, future); + final Number timeout = query.getContextValue("timeout", (Number) null); + return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); + future.cancel(true); + throw new QueryInterruptedException("Query interrupted"); + } + catch(CancellationException e) { + throw new QueryInterruptedException("Query cancelled"); + } + catch(TimeoutException e) { + log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); + future.cancel(true); + throw new QueryInterruptedException("Query timeout"); + } + catch (ExecutionException e) { + throw Throwables.propagate(e.getCause()); + } + } + }; + } + } + ) ); }