Nicer handling for cancelled groupBy v2 queries. (#3330)

1. Wrap temporaryStorage in a resource holder, to avoid spurious "Closed"
   errors from already-running processing tasks.
2. Exit early from the merging accumulator if the query is cancelled.
This commit is contained in:
Gian Merlino 2016-08-05 14:48:06 -07:00 committed by Jonathan Wei
parent decefb7477
commit 1aae5bd67d
4 changed files with 22 additions and 22 deletions

View File

@ -42,6 +42,11 @@ public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
this.closer = closer;
}
public static <T extends Closeable> ReferenceCountingResourceHolder<T> fromCloseable(final T object)
{
return new ReferenceCountingResourceHolder<>(object, object);
}
@Override
public T get()
{

View File

@ -28,7 +28,6 @@ import java.util.Iterator;
public class CloseableGrouperIterator<KeyType extends Comparable<KeyType>, T> implements Iterator<T>, Closeable
{
private final Grouper<KeyType> grouper;
private final Function<Grouper.Entry<KeyType>, T> transformer;
private final Closeable closer;
private final Iterator<Grouper.Entry<KeyType>> iterator;
@ -40,7 +39,6 @@ public class CloseableGrouperIterator<KeyType extends Comparable<KeyType>, T> im
final Closeable closer
)
{
this.grouper = grouper;
this.transformer = transformer;
this.closer = closer;
this.iterator = grouper.iterator(sorted);

View File

@ -148,15 +148,16 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
@Override
public CloseableGrouperIterator<RowBasedKey, Row> make()
{
final List<Closeable> closeOnFailure = Lists.newArrayList();
final List<ReferenceCountingResourceHolder> resources = Lists.newArrayList();
try {
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
temporaryStorageDirectory,
querySpecificConfig.getMaxOnDiskStorage()
);
closeOnFailure.add(temporaryStorage);
final ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder =
ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
resources.add(temporaryStorageHolder);
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
@ -165,7 +166,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new QueryInterruptedException(new TimeoutException());
}
closeOnFailure.add(mergeBufferHolder);
resources.add(mergeBufferHolder);
}
catch (InterruptedException e) {
throw new QueryInterruptedException(e);
@ -184,18 +185,9 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder = new ReferenceCountingResourceHolder<>(
grouper,
new Closeable()
{
@Override
public void close() throws IOException
{
grouper.close();
}
}
);
closeOnFailure.add(grouperHolder);
final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper);
resources.add(grouperHolder);
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
Lists.newArrayList(
@ -271,16 +263,16 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
@Override
public void close() throws IOException
{
grouperHolder.close();
mergeBufferHolder.close();
CloseQuietly.close(temporaryStorage);
for (Closeable closeable : Lists.reverse(resources)) {
CloseQuietly.close(closeable);
}
}
}
);
}
catch (Throwable e) {
// Exception caught while setting up the iterator; release resources.
for (Closeable closeable : Lists.reverse(closeOnFailure)) {
for (Closeable closeable : Lists.reverse(resources)) {
CloseQuietly.close(closeable);
}
throw e;

View File

@ -35,6 +35,7 @@ import com.metamx.common.guava.Accumulator;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.AllGranularity;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
@ -126,6 +127,10 @@ public class RowBasedGrouperHelper
final Row row
)
{
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
}
if (theGrouper == null) {
// Pass-through null returns without doing more work.
return null;