From 1aae5bd67d8c42dbcb76c4fdf086bd7dcb043bda Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 5 Aug 2016 14:48:06 -0700 Subject: [PATCH] Nicer handling for cancelled groupBy v2 queries. (#3330) 1. Wrap temporaryStorage in a resource holder, to avoid spurious "Closed" errors from already-running processing tasks. 2. Exit early from the merging accumulator if the query is cancelled. --- .../ReferenceCountingResourceHolder.java | 5 +++ .../CloseableGrouperIterator.java | 2 -- .../GroupByMergingQueryRunnerV2.java | 32 +++++++------------ .../epinephelinae/RowBasedGrouperHelper.java | 5 +++ 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java index a83c50c8b5e..74cf1ccdac5 100644 --- a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java +++ b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java @@ -42,6 +42,11 @@ public class ReferenceCountingResourceHolder implements ResourceHolder this.closer = closer; } + public static ReferenceCountingResourceHolder fromCloseable(final T object) + { + return new ReferenceCountingResourceHolder<>(object, object); + } + @Override public T get() { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/CloseableGrouperIterator.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/CloseableGrouperIterator.java index a07a3d764ee..e7e3e102f4e 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/CloseableGrouperIterator.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/CloseableGrouperIterator.java @@ -28,7 +28,6 @@ import java.util.Iterator; public class CloseableGrouperIterator, T> implements Iterator, Closeable { - private final Grouper grouper; private final Function, T> transformer; private final Closeable closer; private final Iterator> iterator; @@ -40,7 +39,6 @@ public class CloseableGrouperIterator, T> im final Closeable closer ) { - this.grouper = grouper; this.transformer = transformer; this.closer = closer; this.iterator = grouper.iterator(sorted); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 3b467e5917b..cffa996af41 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -148,15 +148,16 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner @Override public CloseableGrouperIterator make() { - final List closeOnFailure = Lists.newArrayList(); + final List resources = Lists.newArrayList(); try { final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( temporaryStorageDirectory, querySpecificConfig.getMaxOnDiskStorage() ); - - closeOnFailure.add(temporaryStorage); + final ReferenceCountingResourceHolder temporaryStorageHolder = + ReferenceCountingResourceHolder.fromCloseable(temporaryStorage); + resources.add(temporaryStorageHolder); final ReferenceCountingResourceHolder mergeBufferHolder; try { @@ -165,7 +166,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { throw new QueryInterruptedException(new TimeoutException()); } - closeOnFailure.add(mergeBufferHolder); + resources.add(mergeBufferHolder); } catch (InterruptedException e) { throw new QueryInterruptedException(e); @@ -184,18 +185,9 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner final Grouper grouper = pair.lhs; final Accumulator, Row> accumulator = pair.rhs; - final ReferenceCountingResourceHolder> grouperHolder = new ReferenceCountingResourceHolder<>( - grouper, - new Closeable() - { - @Override - public void close() throws IOException - { - grouper.close(); - } - } - ); - closeOnFailure.add(grouperHolder); + final ReferenceCountingResourceHolder> grouperHolder = + ReferenceCountingResourceHolder.fromCloseable(grouper); + resources.add(grouperHolder); ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( @@ -271,16 +263,16 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner @Override public void close() throws IOException { - grouperHolder.close(); - mergeBufferHolder.close(); - CloseQuietly.close(temporaryStorage); + for (Closeable closeable : Lists.reverse(resources)) { + CloseQuietly.close(closeable); + } } } ); } catch (Throwable e) { // Exception caught while setting up the iterator; release resources. - for (Closeable closeable : Lists.reverse(closeOnFailure)) { + for (Closeable closeable : Lists.reverse(resources)) { CloseQuietly.close(closeable); } throw e; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index ebebab99af9..02e01ca4f63 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -35,6 +35,7 @@ import com.metamx.common.guava.Accumulator; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.AllGranularity; +import io.druid.query.QueryInterruptedException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; @@ -126,6 +127,10 @@ public class RowBasedGrouperHelper final Row row ) { + if (Thread.interrupted()) { + throw new QueryInterruptedException(new InterruptedException()); + } + if (theGrouper == null) { // Pass-through null returns without doing more work. return null;