From 7195be32d85478c5ea23cc12d99ed4c8afa423ef Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 23 Sep 2016 13:29:11 -0700 Subject: [PATCH] groupBy v2: Fix dangling references. (#3500) Acquiring references in the processing task prevents dangling references caused by canceled processing tasks. --- .../GroupByMergingQueryRunnerV2.java | 65 +++++++------------ 1 file changed, 24 insertions(+), 41 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 1be7b4ce048..c2aee4ffa98 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -205,49 +205,32 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ); } - final Releaser bufferReleaser = mergeBufferHolder.increment(); - try { - final Releaser grouperReleaser = grouperHolder.increment(); - try { - return exec.submit( - new AbstractPrioritizedCallable(priority) - { - @Override - public Boolean call() throws Exception - { - try { - final Object retVal = input.run(queryForRunners, responseContext) - .accumulate(grouper, accumulator); + return exec.submit( + new AbstractPrioritizedCallable(priority) + { + @Override + public Boolean call() throws Exception + { + try ( + Releaser bufferReleaser = mergeBufferHolder.increment(); + Releaser grouperReleaser = grouperHolder.increment() + ) { + final Object retVal = input.run(queryForRunners, responseContext) + .accumulate(grouper, accumulator); - // Return true if OK, false if resources were exhausted. - return retVal == grouper; - } - catch (QueryInterruptedException e) { - throw e; - } - catch (Exception e) { - log.error(e, "Exception with one of the sequences!"); - throw Throwables.propagate(e); - } - finally { - grouperReleaser.close(); - bufferReleaser.close(); - } - } + // Return true if OK, false if resources were exhausted. + return retVal == grouper; } - ); - } - catch (Exception e) { - // Exception caught while submitting the task; release resources. - grouperReleaser.close(); - throw e; - } - } - catch (Exception e) { - // Exception caught while submitting the task; release resources. - bufferReleaser.close(); - throw e; - } + catch (QueryInterruptedException e) { + throw e; + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } + } + ); } } )