Use Closer instead of List<Closeable> (#8235)

* Use Closer instead of List<Closeable>

* Process comments

* Catch an Exception instead

* Removed unused import
This commit is contained in:
Fokko Driesprong 2019-08-07 08:29:03 +02:00 committed by Benedict Jin
parent 5e57492298
commit 7702005f8f
3 changed files with 36 additions and 22 deletions

View File

@ -108,12 +108,11 @@ public final class Closer implements Closeable
}
/**
* Registers the given {@code closeable} to be closed when this {@code Closer} is
* Registers the given {@code Closeable} to be closed when this {@code Closer} is
* {@linkplain #close closed}.
*
* @return the given {@code closeable}
* @return the given {@code Closeable}
*/
// close. this word no longer has any meaning to me.
public <C extends Closeable> C register(@Nullable C closeable)
{
if (closeable != null) {
@ -123,6 +122,18 @@ public final class Closer implements Closeable
return closeable;
}
/**
* Registers a list of {@code Closeable} to be closed when this {@code Closer} is
* {@linkplain #close closed}.
*
* @return the supplied list of {@code Closeable}
*/
public <C extends Closeable> Iterable<C> registerAll(Iterable<C> closeables)
{
closeables.forEach(this::register);
return closeables;
}
/**
* Stores the given throwable and rethrows it. It will be rethrown as is if it is an
* {@code IOException}, {@code RuntimeException} or {@code Error}. Otherwise, it will be rethrown

View File

@ -39,8 +39,8 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.AbstractPrioritizedCallable;
import org.apache.druid.query.ChainedExecutionQueryRunner;
@ -58,7 +58,6 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBas
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
@ -166,7 +165,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
@Override
public CloseableGrouperIterator<RowBasedKey, ResultRow> make()
{
final List<ReferenceCountingResourceHolder> resources = new ArrayList<>();
final Closer resources = Closer.create();
try {
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
@ -175,7 +174,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
);
final ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder =
ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
resources.add(temporaryStorageHolder);
resources.register(temporaryStorageHolder);
// If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining
final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1;
@ -185,7 +184,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
hasTimeout,
timeoutAt
);
resources.addAll(mergeBufferHolders);
resources.registerAll(mergeBufferHolders);
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder = mergeBufferHolders.get(0);
final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder = numMergeBuffers == 2 ?
@ -214,7 +213,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper);
resources.add(grouperHolder);
resources.register(grouperHolder);
ListenableFuture<List<AggregateResult>> futures = Futures.allAsList(
Lists.newArrayList(
@ -280,13 +279,18 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
return RowBasedGrouperHelper.makeGrouperIterator(
grouper,
query,
() -> Lists.reverse(resources).forEach(CloseQuietly::close)
resources
);
}
catch (Throwable e) {
catch (Throwable t) {
// Exception caught while setting up the iterator; release resources.
Lists.reverse(resources).forEach(CloseQuietly::close);
throw e;
try {
resources.close();
}
catch (Exception ex) {
t.addSuppressed(ex);
}
throw t;
}
}

View File

@ -21,14 +21,13 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
@ -40,8 +39,8 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@ -95,7 +94,7 @@ public class GroupByRowProcessor
final int mergeBufferSize
)
{
final List<Closeable> closeOnExit = new ArrayList<>();
final Closer closeOnExit = Closer.create();
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
final File temporaryStorageDirectory = new File(
@ -108,7 +107,7 @@ public class GroupByRowProcessor
querySpecificConfig.getMaxOnDiskStorage()
);
closeOnExit.add(temporaryStorage);
closeOnExit.register(temporaryStorage);
Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, ResultRow>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
@ -120,7 +119,7 @@ public class GroupByRowProcessor
public ByteBuffer get()
{
final ResourceHolder<ByteBuffer> mergeBufferHolder = resource.getMergeBuffer();
closeOnExit.add(mergeBufferHolder);
closeOnExit.register(mergeBufferHolder);
return mergeBufferHolder.get();
}
},
@ -130,7 +129,7 @@ public class GroupByRowProcessor
);
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<AggregateResult, ResultRow> accumulator = pair.rhs;
closeOnExit.add(grouper);
closeOnExit.register(grouper);
final AggregateResult retVal = rows.accumulate(AggregateResult.ok(), accumulator);
@ -147,9 +146,9 @@ public class GroupByRowProcessor
}
@Override
public void close()
public void close() throws IOException
{
Lists.reverse(closeOnExit).forEach(CloseQuietly::close);
closeOnExit.close();
}
};
}