From 1be85af32019afd80252ebc2aa746640afa8fb04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 9 May 2014 22:51:43 -0700 Subject: [PATCH] handle query interruption at cursor level --- .../io/druid/segment/QueryableIndexStorageAdapter.java | 7 +++++++ .../incremental/IncrementalIndexStorageAdapter.java | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index a71297b3b89..f624cdfa807 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -27,6 +27,7 @@ import com.google.common.io.Closeables; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; +import io.druid.query.QueryInterruptedException; import io.druid.query.filter.Filter; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; @@ -224,6 +225,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public void advance() { + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } cursorOffset.increment(); } @@ -652,6 +656,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public void advance() { + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } ++currRow; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 0eddf59ac98..3fe807b2761 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -29,6 +29,7 @@ import com.metamx.collections.spatial.search.Bound; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; +import io.druid.query.QueryInterruptedException; import io.druid.query.aggregation.Aggregator; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; @@ -239,6 +240,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter Iterators.advance(baseIter, numAdvanced); } + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } + boolean foundMatched = false; while (baseIter.hasNext()) { currEntry.set(baseIter.next());