mirror of https://github.com/apache/druid.git
proper query exceptions and add support for query timeout
This commit is contained in:
parent
1183c68ce4
commit
a56a655eae
|
@ -39,8 +39,11 @@ import com.metamx.common.logger.Logger;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
|
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
|
||||||
|
@ -149,15 +152,26 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
|
||||||
queryWatcher.registerQuery(query, futures);
|
queryWatcher.registerQuery(query, futures);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
final Number timeout = query.getContextValue("timeout", (Number)null);
|
||||||
return new MergeIterable<>(
|
return new MergeIterable<>(
|
||||||
ordering.nullsFirst(),
|
ordering.nullsFirst(),
|
||||||
futures.get()
|
timeout == null ?
|
||||||
|
futures.get() :
|
||||||
|
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS)
|
||||||
).iterator();
|
).iterator();
|
||||||
}
|
}
|
||||||
catch (InterruptedException e) {
|
catch (InterruptedException e) {
|
||||||
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
|
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
|
||||||
futures.cancel(true);
|
futures.cancel(true);
|
||||||
throw new QueryInterruptedException(e);
|
throw new QueryInterruptedException("Query interrupted");
|
||||||
|
}
|
||||||
|
catch(CancellationException e) {
|
||||||
|
log.warn(e, "Query cancelled, query id [%s]", query.getId());
|
||||||
|
throw new QueryInterruptedException("Query cancelled");
|
||||||
|
}
|
||||||
|
catch(TimeoutException e) {
|
||||||
|
log.warn(e, "Query timeout, query id [%s]", query.getId());
|
||||||
|
throw new QueryInterruptedException("Query timeout");
|
||||||
}
|
}
|
||||||
catch (ExecutionException e) {
|
catch (ExecutionException e) {
|
||||||
throw Throwables.propagate(e.getCause());
|
throw Throwables.propagate(e.getCause());
|
||||||
|
|
Loading…
Reference in New Issue