handle query interruption at cursor level

This commit is contained in:
Xavier Léauté 2014-05-09 22:51:43 -07:00
parent d6f38827db
commit 1be85af320
2 changed files with 12 additions and 0 deletions

View File

@ -27,6 +27,7 @@ import com.google.common.io.Closeables;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.QueryInterruptedException;
import io.druid.query.filter.Filter;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
@ -224,6 +225,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
cursorOffset.increment();
}
@ -652,6 +656,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
++currRow;
}

View File

@ -29,6 +29,7 @@ import com.metamx.collections.spatial.search.Bound;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
@ -239,6 +240,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
Iterators.advance(baseIter, numAdvanced);
}
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
boolean foundMatched = false;
while (baseIter.hasNext()) {
currEntry.set(baseIter.next());