diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 6c12d37669b..c7ed29f1ddf 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -39,8 +39,11 @@ import com.metamx.common.logger.Logger; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor. @@ -149,15 +152,26 @@ public class ChainedExecutionQueryRunner implements QueryRunner queryWatcher.registerQuery(query, futures); try { + final Number timeout = query.getContextValue("timeout", (Number)null); return new MergeIterable<>( ordering.nullsFirst(), - futures.get() + timeout == null ? + futures.get() : + futures.get(timeout.longValue(), TimeUnit.MILLISECONDS) ).iterator(); } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); futures.cancel(true); - throw new QueryInterruptedException(e); + throw new QueryInterruptedException("Query interrupted"); + } + catch(CancellationException e) { + log.warn(e, "Query cancelled, query id [%s]", query.getId()); + throw new QueryInterruptedException("Query cancelled"); + } + catch(TimeoutException e) { + log.warn(e, "Query timeout, query id [%s]", query.getId()); + throw new QueryInterruptedException("Query timeout"); } catch (ExecutionException e) { throw Throwables.propagate(e.getCause());