groupBy v2: Fix dangling references. (#3500)

Acquiring references in the processing task prevents dangling references
caused by canceled processing tasks.
This commit is contained in:
Gian Merlino 2016-09-23 13:29:11 -07:00 committed by Nishant
parent 9226d4af3c
commit 7195be32d8
1 changed files with 24 additions and 41 deletions

View File

@ -205,49 +205,32 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
);
}
final Releaser bufferReleaser = mergeBufferHolder.increment();
try {
final Releaser grouperReleaser = grouperHolder.increment();
try {
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
final Object retVal = input.run(queryForRunners, responseContext)
.accumulate(grouper, accumulator);
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try (
Releaser bufferReleaser = mergeBufferHolder.increment();
Releaser grouperReleaser = grouperHolder.increment()
) {
final Object retVal = input.run(queryForRunners, responseContext)
.accumulate(grouper, accumulator);
// Return true if OK, false if resources were exhausted.
return retVal == grouper;
}
catch (QueryInterruptedException e) {
throw e;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
finally {
grouperReleaser.close();
bufferReleaser.close();
}
}
// Return true if OK, false if resources were exhausted.
return retVal == grouper;
}
);
}
catch (Exception e) {
// Exception caught while submitting the task; release resources.
grouperReleaser.close();
throw e;
}
}
catch (Exception e) {
// Exception caught while submitting the task; release resources.
bufferReleaser.close();
throw e;
}
catch (QueryInterruptedException e) {
throw e;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
)