From 98ecbb21cd5715139da40ede8b24759f8ea4e47a Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 23 Oct 2021 17:03:21 -0700 Subject: [PATCH] Remove CloseQuietly and migrate its usages to other methods. (#10247) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove CloseQuietly and migrate its usages to other methods. These other methods include: 1) New method CloseableUtils.closeAndWrapExceptions, which wraps IOExceptions in RuntimeExceptions for callers that just want to avoid dealing with checked exceptions. Most usages were migrated to this method, because it looks like they were mainly attempts to avoid declaring a throws clause, and perhaps were unintentionally suppressing IOExceptions. 2) New method CloseableUtils.closeInCatch, designed to properly close something in a catch block without losing exceptions. Some usages from catch blocks were migrated here, when it seemed that they were intended to avoid checked exception handling, and did not really intend to also suppress IOExceptions. 3) New method CloseableUtils.closeAndSuppressExceptions, which sends all exceptions to a "chomper" that consumes them. Nothing is thrown or returned. The behavior is slightly different: with this method, _all_ exceptions are suppressed, not just IOExceptions. Calls that seemed like they had good reason to suppress exceptions were migrated here. 4) Some calls were migrated to try-with-resources, in cases where it appeared that CloseQuietly was being used to avoid throwing an exception in a finally block. 🎵 You don't have to go home, but you can't stay here... 🎵 * Remove unused import. * Fix up various issues. * Adjustments to tests. * Fix null handling. * Additional test. * Adjustments from review. * Fixup style stuff. * Fix NPE caused by holder starting out null. * Fix spelling. * Chomp Throwables too. --- .../input/impl/prefetch/JsonIterator.java | 12 +- .../java/util/common/guava/CloseQuietly.java | 45 -- .../guava/ParallelMergeCombiningSequence.java | 34 +- .../java/util/http/client/HttpClientInit.java | 35 +- .../SequenceInputStreamResponseHandler.java | 21 +- .../apache/druid/utils/CloseableUtils.java | 134 ++++- .../druid/utils/CloseableUtilsTest.java | 465 ++++++++++++++++++ .../k8s/discovery/K8sDruidLeaderSelector.java | 24 +- .../K8sDruidNodeDiscoveryProvider.java | 5 +- .../AppenderatorDriverRealtimeIndexTask.java | 6 +- .../common/task/RealtimeIndexTask.java | 10 +- .../indexing/input/DruidSegmentReader.java | 7 +- .../indexing/overlord/http/OverlordTest.java | 6 +- .../apache/druid/guice/PropertiesModule.java | 38 +- .../query/groupby/GroupByQueryEngine.java | 15 +- .../groupby/strategy/GroupByStrategyV2.java | 21 +- .../query/scan/ScanQueryQueryToolChest.java | 4 +- .../druid/query/topn/PooledTopNAlgorithm.java | 4 +- .../QueryableIndexIndexableAdapter.java | 5 +- .../column/StringDictionaryEncodedColumn.java | 11 +- .../BlockLayoutColumnarDoublesSupplier.java | 5 +- .../BlockLayoutColumnarFloatsSupplier.java | 5 +- .../BlockLayoutColumnarLongsSupplier.java | 9 +- .../data/CompressedColumnarIntsSupplier.java | 15 +- .../CompressedVSizeColumnarIntsSupplier.java | 5 +- .../druid/segment/data/GenericIndexed.java | 6 +- .../druid/segment/join/HashJoinSegment.java | 13 +- .../aggregation/AggregationTestHelper.java | 4 +- .../StringColumnAggregationTest.java | 4 +- .../CompressedColumnarIntsSerializerTest.java | 6 +- .../CompressedColumnarIntsSupplierTest.java | 12 +- .../data/CompressedDoublesSerdeTest.java | 4 +- .../data/CompressedFloatsSerdeTest.java | 29 +- .../data/CompressedLongsSerdeTest.java | 4 +- ...ressedVSizeColumnarIntsSerializerTest.java | 7 +- ...mpressedVSizeColumnarIntsSupplierTest.java | 25 +- ...dVSizeColumnarMultiIntsSerializerTest.java | 16 +- .../druid/client/DirectDruidClient.java | 4 +- .../druid/client/JsonParserIterator.java | 4 +- .../druid/client/cache/HybridCache.java | 2 +- .../druid/curator/announcement/Announcer.java | 18 +- .../discovery/CuratorDruidLeaderSelector.java | 13 +- .../CuratorDruidNodeDiscoveryProvider.java | 4 +- .../appenderator/SinkQuerySegmentWalker.java | 7 +- .../firehose/TimedShutoffFirehoseFactory.java | 2 +- .../druid/server/log/FileRequestLogger.java | 15 +- .../druid/client/cache/HybridCacheTest.java | 14 +- 47 files changed, 869 insertions(+), 285 deletions(-) delete mode 100644 core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java create mode 100644 core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java index c03e5f6d8e8..5abadb6c2ad 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java @@ -25,9 +25,8 @@ import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.utils.CloseableUtils; import java.io.Closeable; import java.io.IOException; @@ -80,7 +79,7 @@ public class JsonIterator implements CloseableIterator return false; } if (jp.getCurrentToken() == JsonToken.END_ARRAY) { - CloseQuietly.close(jp); + CloseableUtils.closeAndWrapExceptions(jp); return false; } return true; @@ -131,11 +130,6 @@ public class JsonIterator implements CloseableIterator @Override public void close() throws IOException { - Closer closer = Closer.create(); - if (jp != null) { - closer.register(jp); - } - closer.register(resourceCloser); - closer.close(); + CloseableUtils.closeAll(jp, resourceCloser); } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java b/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java deleted file mode 100644 index 8b4807b9045..00000000000 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.common.guava; - -import org.apache.druid.java.util.common.logger.Logger; - -import java.io.Closeable; -import java.io.IOException; - -/** - */ -public class CloseQuietly -{ - private static final Logger log = new Logger(CloseQuietly.class); - - public static void close(Closeable closeable) - { - if (closeable == null) { - return; - } - try { - closeable.close(); - } - catch (IOException e) { - log.error(e, "IOException thrown while closing Closeable."); - } - } -} diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 2a5c489cf72..749f3e64144 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryTimeoutException; +import org.apache.druid.utils.CloseableUtils; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; @@ -175,6 +176,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase new BaseSequence.IteratorMaker>() { private boolean shouldCancelOnCleanup = true; + @Override public Iterator make() { @@ -463,18 +465,19 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1); - LOG.debug("Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " - + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " - + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", - computedNumParallelTasks, - parallelism, - getPool().getActiveThreadCount(), - runningThreadCount, - submissionCount, - getPool().getQueuedTaskCount(), - getPool().getParallelism(), - getPool().getPoolSize(), - getPool().getStealCount() + LOG.debug( + "Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " + + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " + + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", + computedNumParallelTasks, + parallelism, + getPool().getActiveThreadCount(), + runningThreadCount, + submissionCount, + getPool().getQueuedTaskCount(), + getPool().getParallelism(), + getPool().getPoolSize(), + getPool().getStealCount() ); return computedNumParallelTasks; @@ -609,7 +612,10 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase // which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order // to prevent normal jitter in processing time from skewing the next yield value too far in any direction final long elapsedNanos = System.nanoTime() - start; - final double nextYieldAfter = Math.max((double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), 1.0); + final double nextYieldAfter = Math.max( + (double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), + 1.0 + ); final long recursionDepth = metricsAccumulator.getTaskCount(); final double cumulativeMovingAverage = (nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1); @@ -1376,6 +1382,6 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase { Closer closer = Closer.create(); closer.registerAll(cursors); - CloseQuietly.close(closer); + CloseableUtils.closeAndSuppressExceptions(closer, e -> LOG.warn(e, "Failed to close result cursors")); } } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java index 68555b002b5..279a6a8b51d 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java @@ -19,8 +19,8 @@ package org.apache.druid.java.util.http.client; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory; import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory; @@ -39,17 +39,12 @@ import org.jboss.netty.util.Timer; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.KeyManagementException; import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** + * */ public class HttpClientInit { @@ -110,11 +105,8 @@ public class HttpClientInit public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath, final String keyStorePassword) { - FileInputStream in = null; - try { + try (FileInputStream in = new FileInputStream(keyStorePath)) { final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); - - in = new FileInputStream(keyStorePath); ks.load(in, keyStorePassword.toCharArray()); in.close(); @@ -125,27 +117,10 @@ public class HttpClientInit return sslContext; } - catch (CertificateException e) { + catch (Exception e) { + Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - catch (KeyStoreException e) { - throw new RuntimeException(e); - } - catch (KeyManagementException e) { - throw new RuntimeException(e); - } - catch (FileNotFoundException e) { - throw new RuntimeException(e); - } - catch (IOException e) { - throw new RuntimeException(e); - } - finally { - CloseQuietly.close(in); - } } private static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer, int bossPoolSize, int workerPoolSize) diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java index e395a717239..e1094ba8908 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.http.client.response; import com.google.common.io.ByteSource; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; @@ -57,19 +56,17 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response, TrafficCop trafficCop) { - ChannelBufferInputStream channelStream = null; - try { - channelStream = new ChannelBufferInputStream(response.getContent()); + try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(response.getContent())) { queue.put(channelStream); } + catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { log.error(e, "Queue appending interrupted"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } - finally { - CloseQuietly.close(channelStream); - } byteCount.addAndGet(response.getContent().readableBytes()); return ClientResponse.finished( new SequenceInputStream( @@ -112,21 +109,19 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler 0) { - ChannelBufferInputStream channelStream = null; - try { - channelStream = new ChannelBufferInputStream(channelBuffer); + try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(channelBuffer)) { queue.put(channelStream); // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong log.debug("Added stream. Queue length %d", queue.size()); } + catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { log.warn(e, "Thread interrupted while adding to queue"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } - finally { - CloseQuietly.close(channelStream); - } byteCount.addAndGet(bytes); } else { log.debug("Skipping zero length chunk"); diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java index 768abbe86dc..6a0a143b13e 100644 --- a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java @@ -19,11 +19,19 @@ package org.apache.druid.utils; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.io.Closer; + +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; /** - * Methods in this class could have belonged to {@link org.apache.druid.java.util.common.io.Closer}, but not editing + * Methods in this class could have belonged to {@link Closer}, but not editing * that class to keep its source close to Guava source. */ public final class CloseableUtils @@ -34,15 +42,127 @@ public final class CloseableUtils * first.close(); * second.close(); * - * to have safety of {@link org.apache.druid.java.util.common.io.Closer}, but without associated boilerplate code + * to have safety of {@link Closer}, but without associated boilerplate code * of creating a Closer and registering objects in it. */ - public static void closeBoth(Closeable first, Closeable second) throws IOException + public static void closeAll(Closeable first, Closeable... others) throws IOException { - //noinspection EmptyTryBlock - try (Closeable ignore1 = second; - Closeable ignore2 = first) { - // piggy-back try-with-resources semantics + final List closeables = new ArrayList<>(others.length + 1); + closeables.add(first); + closeables.addAll(Arrays.asList(others)); + closeAll(closeables); + } + + /** + * Close all the provided {@param closeables}, from first to last. + */ + public static void closeAll(Iterable closeables) throws IOException + { + final Closer closer = Closer.create(); + + // Register in reverse order, so we close from first to last. + closer.registerAll(Lists.reverse(Lists.newArrayList(closeables))); + closer.close(); + } + + /** + * Like {@link Closeable#close()}, but guaranteed to throw {@param caught}. Will add any exceptions encountered + * during closing to {@param caught} using {@link Throwable#addSuppressed(Throwable)}. + * + * Should be used like {@code throw CloseableUtils.closeInCatch(e, closeable)}. (The "throw" is important for + * reachability detection.) + */ + public static RuntimeException closeInCatch( + final E caught, + @Nullable final Closeable closeable + ) throws E + { + if (caught == null) { + // Incorrect usage; throw an exception with an error message that may be useful to the programmer. + final RuntimeException e1 = new IllegalStateException("Must be called with non-null caught exception"); + + if (closeable != null) { + try { + closeable.close(); + } + catch (Throwable e2) { + e1.addSuppressed(e2); + } + } + + throw e1; + } + + if (closeable != null) { + try { + closeable.close(); + } + catch (Throwable e) { + caught.addSuppressed(e); + } + } + + throw caught; + } + + /** + * Like {@link #closeInCatch} but wraps {@param caught} in a {@link RuntimeException} if it is a checked exception. + */ + public static RuntimeException closeAndWrapInCatch( + final E caught, + @Nullable final Closeable closeable + ) + { + try { + throw closeInCatch(caught, closeable); + } + catch (RuntimeException | Error e) { + // Unchecked exception. + throw e; + } + catch (Throwable e) { + // Checked exception; must wrap. + throw new RuntimeException(e); + } + } + + /** + * Like {@link Closeable#close()} but wraps IOExceptions in RuntimeExceptions. + */ + public static void closeAndWrapExceptions(@Nullable final Closeable closeable) + { + if (closeable == null) { + return; + } + + try { + closeable.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Like {@link Closeable#close()} but sends any exceptions to the provided Consumer and then returns quietly. + * + * If the Consumer throws an exception, that exception is thrown by this method. So if your intent is to chomp + * exceptions, you should avoid writing a Consumer that might throw an exception. + */ + public static void closeAndSuppressExceptions( + @Nullable final Closeable closeable, + final Consumer chomper + ) + { + if (closeable == null) { + return; + } + + try { + closeable.close(); + } + catch (Throwable e) { + chomper.accept(e); } } diff --git a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java new file mode 100644 index 00000000000..57fa61079d2 --- /dev/null +++ b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.utils; + +import com.google.common.base.Throwables; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableCauseMatcher; +import org.junit.internal.matchers.ThrowableMessageMatcher; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class CloseableUtilsTest +{ + private final TestCloseable quietCloseable = new TestCloseable(null); + private final TestCloseable quietCloseable2 = new TestCloseable(null); + private final TestCloseable ioExceptionCloseable = new TestCloseable(new IOException()); + private final TestCloseable runtimeExceptionCloseable = new TestCloseable(new IllegalArgumentException()); + private final TestCloseable assertionErrorCloseable = new TestCloseable(new AssertionError()); + + // For closeAndSuppressException tests. + private final AtomicLong chomped = new AtomicLong(); + private final Consumer chomper = e -> chomped.incrementAndGet(); + + @Test + public void test_closeAll_array_quiet() throws IOException + { + CloseableUtils.closeAll(quietCloseable, null, quietCloseable2); + assertClosed(quietCloseable, quietCloseable2); + } + + @Test + public void test_closeAll_list_quiet() throws IOException + { + CloseableUtils.closeAll(Arrays.asList(quietCloseable, null, quietCloseable2)); + assertClosed(quietCloseable, quietCloseable2); + } + + @Test + public void test_closeAll_array_loud() + { + Exception e = null; + try { + CloseableUtils.closeAll(quietCloseable, null, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + } + catch (Exception e2) { + e = e2; + } + + assertClosed(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAll_list_loud() + { + Exception e = null; + try { + CloseableUtils.closeAll( + Arrays.asList( + quietCloseable, + null, + ioExceptionCloseable, + quietCloseable2, + runtimeExceptionCloseable + ) + ); + } + catch (Exception e2) { + e = e2; + } + + assertClosed(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndWrapExceptions_null() + { + CloseableUtils.closeAndWrapExceptions(null); + // Nothing happens. + } + + @Test + public void test_closeAndWrapExceptions_quiet() + { + CloseableUtils.closeAndWrapExceptions(quietCloseable); + assertClosed(quietCloseable); + } + + @Test + public void test_closeAndWrapExceptions_ioException() + { + Exception e = null; + try { + CloseableUtils.closeAndWrapExceptions(ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + assertClosed(ioExceptionCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + } + + @Test + public void test_closeAndWrapExceptions_runtimeException() + { + Exception e = null; + try { + CloseableUtils.closeAndWrapExceptions(runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + assertClosed(runtimeExceptionCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndWrapExceptions_assertionError() + { + Throwable e = null; + try { + CloseableUtils.closeAndWrapExceptions(assertionErrorCloseable); + } + catch (Throwable e1) { + e = e1; + } + + assertClosed(assertionErrorCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(AssertionError.class)); + } + + @Test + public void test_closeAndSuppressExceptions_null() + { + CloseableUtils.closeAndSuppressExceptions(null, chomper); + Assert.assertEquals(0, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_quiet() + { + CloseableUtils.closeAndSuppressExceptions(quietCloseable, chomper); + assertClosed(quietCloseable); + Assert.assertEquals(0, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_ioException() + { + CloseableUtils.closeAndSuppressExceptions(ioExceptionCloseable, chomper); + assertClosed(ioExceptionCloseable); + Assert.assertEquals(1, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_runtimeException() + { + CloseableUtils.closeAndSuppressExceptions(runtimeExceptionCloseable, chomper); + assertClosed(runtimeExceptionCloseable); + Assert.assertEquals(1, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_assertionError() + { + CloseableUtils.closeAndSuppressExceptions(assertionErrorCloseable, chomper); + assertClosed(assertionErrorCloseable); + Assert.assertEquals(1, chomped.get()); + } + + @Test + public void test_closeInCatch_improper() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(null, quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Must be called with non-null caught exception")) + ); + } + + @Test + public void test_closeInCatch_quiet() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + } + + @Test + public void test_closeInCatch_ioException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new IOException("this one was caught"), ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(ioExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IOException.class)); + } + + @Test + public void test_closeInCatch_runtimeException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(runtimeExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndWrapInCatch_improper() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(null, quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Must be called with non-null caught exception")) + ); + } + + @Test + public void test_closeAndWrapInCatch_quiet() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + } + + @Test + public void test_closeAndWrapInCatch_ioException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new IOException("this one was caught"), ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(ioExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("java.io.IOException: this one was caught")) + ); + Assert.assertThat(e, ThrowableCauseMatcher.hasCause(CoreMatchers.instanceOf(IOException.class))); + Assert.assertThat( + e, + ThrowableCauseMatcher.hasCause( + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ) + ); + + // Second exception + Assert.assertEquals(1, e.getCause().getSuppressed().length); + Assert.assertThat(e.getCause().getSuppressed()[0], CoreMatchers.instanceOf(IOException.class)); + } + + @Test + public void test_closeAndWrapInCatch_runtimeException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(runtimeExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndWrapInCatch_assertionError() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), assertionErrorCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(assertionErrorCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(AssertionError.class)); + } + + private static void assertClosed(final TestCloseable... closeables) + { + for (TestCloseable closeable : closeables) { + Assert.assertTrue(closeable.isClosed()); + } + } + + private static class TestCloseable implements Closeable + { + @Nullable + private final Throwable e; + private final AtomicBoolean closed = new AtomicBoolean(false); + + TestCloseable(@Nullable Throwable e) + { + this.e = e; + } + + @Override + public void close() throws IOException + { + closed.set(true); + if (e != null) { + Throwables.propagateIfInstanceOf(e, IOException.class); + throw Throwables.propagate(e); + } + } + + public boolean isClosed() + { + return closed.get(); + } + } +} diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java index 2cf5a8d581b..e8b304218ba 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java @@ -25,9 +25,9 @@ import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.DruidNode; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; @@ -45,7 +45,13 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "incremented but in single thread") private volatile int term = 0; - public K8sDruidLeaderSelector(@Self DruidNode self, String lockResourceName, String lockResourceNamespace, K8sDiscoveryConfig discoveryConfig, K8sLeaderElectorFactory k8sLeaderElectorFactory) + public K8sDruidLeaderSelector( + @Self DruidNode self, + String lockResourceName, + String lockResourceNamespace, + K8sDiscoveryConfig discoveryConfig, + K8sLeaderElectorFactory k8sLeaderElectorFactory + ) { this.leaderLatch = new LeaderElectorAsyncWrapper( self.getServiceScheme() + "://" + self.getHostAndPortToUse(), @@ -72,8 +78,7 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector } catch (Throwable ex) { LOGGER.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); - - CloseQuietly.close(leaderLatch); + closeLeaderLatchQuietly(); leader = false; //Exit and Kubernetes would simply create a new replacement pod. System.exit(1); @@ -147,6 +152,15 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector if (!lifecycleLock.canStop()) { throw new ISE("can't stop."); } - CloseQuietly.close(leaderLatch); + + closeLeaderLatchQuietly(); + } + + private void closeLeaderLatchQuietly() + { + CloseableUtils.closeAndSuppressExceptions( + leaderLatch, + e -> LOGGER.warn("Exception caught while cleaning up leader latch") + ); } } diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java index 08dfafd1e51..6aa271b6d90 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java @@ -32,11 +32,11 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.DruidNode; +import org.apache.druid.utils.CloseableUtils; import java.io.Closeable; import java.net.SocketTimeoutException; @@ -340,7 +340,8 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider try { LOGGER.info("Stopping NodeRoleWatcher for [%s]...", nodeRole); - CloseQuietly.close(watchRef.getAndSet(STOP_MARKER)); + // STOP_MARKER cannot throw exceptions on close(), so this is OK. + CloseableUtils.closeAndSuppressExceptions(STOP_MARKER, e -> {}); watchExecutor.shutdownNow(); if (!watchExecutor.awaitTermination(15, TimeUnit.SECONDS)) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 4e1c5e7a275..30382eaefbd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -61,7 +61,6 @@ import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.NoopQueryRunner; @@ -86,6 +85,7 @@ import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import org.apache.druid.segment.realtime.plumber.Committers; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.apache.druid.utils.CloseableUtils; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import javax.servlet.http.HttpServletRequest; @@ -428,9 +428,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements finally { toolbox.getChatHandlerProvider().unregister(getId()); - CloseQuietly.close(firehose); + CloseableUtils.closeAndSuppressExceptions(firehose, e -> log.warn("Failed to close Firehose")); appenderator.close(); - CloseQuietly.close(driver); + CloseableUtils.closeAndSuppressExceptions(driver, e -> log.warn("Failed to close AppenderatorDriver")); toolbox.removeMonitor(metricsMonitor); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index 6a5fc698daf..85729d7ab8b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; @@ -67,6 +66,7 @@ import org.apache.druid.segment.realtime.plumber.RealtimePlumberSchool; import org.apache.druid.segment.realtime.plumber.VersioningPolicy; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -302,7 +302,11 @@ public class RealtimeIndexTask extends AbstractTask { try { // Side effect: Calling getVersion causes a lock to be acquired - final TimeChunkLockAcquireAction action = new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs); + final TimeChunkLockAcquireAction action = new TimeChunkLockAcquireAction( + TaskLockType.EXCLUSIVE, + interval, + lockTimeoutMs + ); final TaskLock lock = Preconditions.checkNotNull( toolbox.getTaskActionClient().submit(action), "Cannot acquire a lock for interval[%s]", @@ -471,7 +475,7 @@ public class RealtimeIndexTask extends AbstractTask } finally { if (firehose != null) { - CloseQuietly.close(firehose); + CloseableUtils.closeAndSuppressExceptions(firehose, e -> log.warn("Failed to close Firehose")); } toolbox.removeMonitor(metricsMonitor); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index f48d65b4c70..a4d1c2e793e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.filter.DimFilter; @@ -58,6 +57,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; +import org.apache.druid.utils.CloseableUtils; import org.apache.druid.utils.CollectionUtils; import java.io.File; @@ -208,10 +208,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader( () -> Sequences.withBaggage( memoizedSupplier.get().results(dimsToInclude), - closeOnSequenceRead ? () -> CloseQuietly.close(memoizedSupplier.get()) : () -> {} + closeOnSequenceRead + ? () -> CloseableUtils.closeAndWrapExceptions(memoizedSupplier.get()) + : () -> {} ) ), subtotalQuery, null ); } - catch (Exception ex) { - CloseQuietly.close(baseResultsSupplier.get()); - throw ex; + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, baseResultsSupplier.get()); } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 78a369a2b89..413cc8889fc 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -27,7 +27,6 @@ import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.GenericQueryMetricsFactory; @@ -39,6 +38,7 @@ import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.utils.CloseableUtils; import java.util.List; import java.util.Map; @@ -108,7 +108,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest @Override protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + if (holder != null) { + holder.close(); + } holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); // asLongBuffer() makes the longBuffer's position = 0 @@ -190,7 +191,9 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + if (holder != null) { + holder.close(); + } holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); currBufferNum = bufferNum; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index d9b3cf97085..e685721eb88 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -134,7 +133,11 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier holder; - /** buffer's position must be 0 */ + /** + * buffer's position must be 0 + */ IntBuffer buffer; @Override @@ -317,7 +322,9 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier implements CloseableIndexed, Serializer headerOut.writeInt(Ints.checkedCast(valuesOut.size())); if (prevVal instanceof Closeable) { - CloseQuietly.close((Closeable) prevVal); + CloseableUtils.closeAndWrapExceptions((Closeable) prevVal); } prevVal = next; } while (objects.hasNext()); if (prevVal instanceof Closeable) { - CloseQuietly.close((Closeable) prevVal); + CloseableUtils.closeAndWrapExceptions((Closeable) prevVal); } } catch (IOException e) { diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index 34ac51c2b54..7ade12eacae 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -20,14 +20,15 @@ package org.apache.druid.segment.join; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.filter.Filter; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -43,6 +44,8 @@ import java.util.Optional; */ public class HashJoinSegment implements SegmentReference { + private static final Logger log = new Logger(HashJoinSegment.class); + private final SegmentReference baseSegment; private final Filter baseFilter; private final List clauses; @@ -131,14 +134,16 @@ public class HashJoinSegment implements SegmentReference }).orElse(true); } if (acquireFailed) { - CloseQuietly.close(closer); + CloseableUtils.closeAndWrapExceptions(closer); return Optional.empty(); } else { return Optional.of(closer); } } - catch (Exception ex) { - CloseQuietly.close(closer); + catch (Throwable e) { + // acquireReferences is not permitted to throw exceptions. + CloseableUtils.closeAndSuppressExceptions(closer, e::addSuppressed); + log.warn(e, "Exception encountered while trying to acquire reference"); return Optional.empty(); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 1ed990b5f97..b8bc3f051bb 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -41,7 +41,6 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; @@ -85,6 +84,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.junit.rules.TemporaryFolder; import java.io.Closeable; @@ -712,7 +712,7 @@ public class AggregationTestHelper implements Closeable } finally { for (Segment segment : segments) { - CloseQuietly.close(segment); + CloseableUtils.closeAndWrapExceptions(segment); } } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java index 30419b04740..2e516cebf63 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java @@ -28,7 +28,6 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.NoopInputRowParser; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Druids; import org.apache.druid.query.Result; @@ -41,6 +40,7 @@ import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.After; @@ -133,7 +133,7 @@ public class StringColumnAggregationTest { if (segments != null) { for (Segment seg : segments) { - CloseQuietly.close(seg); + CloseableUtils.closeAndWrapExceptions(seg); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java index 43ea5eb0863..52dca67257c 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java @@ -26,7 +26,6 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -35,6 +34,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -226,7 +226,7 @@ public class CompressedColumnarIntsSerializerTest for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception @@ -269,7 +269,7 @@ public class CompressedColumnarIntsSerializerTest for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); mapper.close(); } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java index a857eb4256c..01c9cc26dca 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java @@ -22,9 +22,9 @@ package org.apache.druid.segment.data; import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.CompressedPools; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -58,7 +58,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest public void setUp() { closer = Closer.create(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); columnarInts = null; supplier = null; vals = null; @@ -68,12 +68,12 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest public void tearDown() throws Exception { closer.close(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } private void setupSimple(final int chunkSize) { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; @@ -97,7 +97,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest private void makeWithSerde(final int chunkSize) throws IOException { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedColumnarIntsSupplier theSupplier = CompressedColumnarIntsSupplier.fromIntBuffer( @@ -271,7 +271,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest stopLatch.await(); } finally { - CloseQuietly.close(columnarInts2); + CloseableUtils.closeAndWrapExceptions(columnarInts2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java index 284aea33b75..338897be752 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Doubles; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -321,7 +321,7 @@ public class CompressedDoublesSerdeTest stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java index d11c089abf2..c831170f48f 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Floats; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -78,8 +78,29 @@ public class CompressedFloatsSerdeTest private final float[] values2 = {13.2f, 6.1f, 0.001f, 123f, 12572f, 123.1f, 784.4f, 6892.8634f, 8.341111f}; private final float[] values3 = {0.001f, 0.001f, 0.001f, 0.001f, 0.001f, 100f, 100f, 100f, 100f, 100f}; private final float[] values4 = {0f, 0f, 0f, 0f, 0.01f, 0f, 0f, 0f, 21.22f, 0f, 0f, 0f, 0f, 0f, 0f}; - private final float[] values5 = {123.16f, 1.12f, 62.00f, 462.12f, 517.71f, 56.54f, 971.32f, 824.22f, 472.12f, 625.26f}; - private final float[] values6 = {1000000f, 1000001f, 1000002f, 1000003f, 1000004f, 1000005f, 1000006f, 1000007f, 1000008f}; + private final float[] values5 = { + 123.16f, + 1.12f, + 62.00f, + 462.12f, + 517.71f, + 56.54f, + 971.32f, + 824.22f, + 472.12f, + 625.26f + }; + private final float[] values6 = { + 1000000f, + 1000001f, + 1000002f, + 1000003f, + 1000004f, + 1000005f, + 1000006f, + 1000007f, + 1000008f + }; private final float[] values7 = { Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, 12378.5734f, -12718243.7496f, -93653653.1f, 12743153.385534f, 21431.414538f, 65487435436632.123f, -43734526234564.65f @@ -321,7 +342,7 @@ public class CompressedFloatsSerdeTest stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java index f0effe78a2d..6d6fe4549f9 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -356,7 +356,7 @@ public class CompressedLongsSerdeTest stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java index 5ba842013bb..bb5868f6df2 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.commons.io.IOUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -34,6 +33,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -154,7 +154,7 @@ public class CompressedVSizeColumnarIntsSerializerTest for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } @Test @@ -268,8 +268,7 @@ public class CompressedVSizeColumnarIntsSerializerTest for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); - mapper.close(); + CloseableUtils.closeAll(columnarInts, mapper); } @Test diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java index f113dc3510a..48a443ef661 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java @@ -27,9 +27,9 @@ import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.CompressedPools; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -67,7 +67,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy ); } - private static final int[] MAX_VALUES = new int[] {0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; + private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; public CompressedVSizeColumnarIntsSupplierTest(CompressionStrategy compressionStrategy, ByteOrder byteOrder) { @@ -86,7 +86,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy public void setUp() { closer = Closer.create(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); columnarInts = null; supplier = null; vals = null; @@ -95,13 +95,12 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy @After public void tearDown() throws Exception { - CloseQuietly.close(columnarInts); - closer.close(); + CloseableUtils.closeAll(columnarInts, closer); } private void setupSimple(final int chunkSize) { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; @@ -126,7 +125,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy private void makeWithSerde(final int chunkSize) throws IOException { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedVSizeColumnarIntsSupplier theSupplier = CompressedVSizeColumnarIntsSupplier.fromList( @@ -212,8 +211,14 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy public void testmaxIntsInBuffer() { Assert.assertEquals(CompressedPools.BUFFER_SIZE, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(1)); - Assert.assertEquals(CompressedPools.BUFFER_SIZE / 2, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(2)); - Assert.assertEquals(CompressedPools.BUFFER_SIZE / 4, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(4)); + Assert.assertEquals( + CompressedPools.BUFFER_SIZE / 2, + CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(2) + ); + Assert.assertEquals( + CompressedPools.BUFFER_SIZE / 4, + CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(4) + ); Assert.assertEquals(CompressedPools.BUFFER_SIZE, 0x10000); // nearest power of 2 is 2^14 Assert.assertEquals(1 << 14, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(3)); @@ -330,7 +335,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy stopLatch.await(); } finally { - CloseQuietly.close(columnarInts2); + CloseableUtils.closeAndWrapExceptions(columnarInts2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java index de6485fff1f..6877416e764 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -34,6 +33,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -322,12 +322,17 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest Assert.assertEquals(subVals.get(j), vals.get(i)[j]); } } - CloseQuietly.close(columnarMultiInts); - mapper.close(); + CloseableUtils.closeAll(columnarMultiInts, mapper); } } - private void generateV2SerializedSizeAndData(long numRows, int maxValue, int maxValuesPerRow, int offsetChunkFactor, int valueChunkFactor) throws Exception + private void generateV2SerializedSizeAndData( + long numRows, + int maxValue, + int maxValuesPerRow, + int offsetChunkFactor, + int valueChunkFactor + ) throws Exception { File tmpDirectory = FileUtils.createTempDir(StringUtils.format( "CompressedVSizeIndexedV3WriterTest_%d_%d", @@ -395,8 +400,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest Assert.assertEquals(subVals.get(j), expected[j]); } } - CloseQuietly.close(columnarMultiInts); - mapper.close(); + CloseableUtils.closeAll(columnarMultiInts, mapper); } } } diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 75a2a4507e4..ead9e2d3274 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; @@ -58,6 +57,7 @@ import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.server.QueryResource; +import org.apache.druid.utils.CloseableUtils; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -518,7 +518,7 @@ public class DirectDruidClient implements QueryRunner @Override public void cleanup(JsonParserIterator iterFromMake) { - CloseQuietly.close(iterFromMake); + CloseableUtils.closeAndWrapExceptions(iterFromMake); } } ); diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java index b7b4d631e42..35532276e93 100644 --- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java +++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryCapacityExceededException; @@ -35,6 +34,7 @@ import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.io.Closeable; @@ -96,7 +96,7 @@ public class JsonParserIterator implements Iterator, Closeable return false; } if (jp.getCurrentToken() == JsonToken.END_ARRAY) { - CloseQuietly.close(jp); + CloseableUtils.closeAndWrapExceptions(jp); return false; } diff --git a/server/src/main/java/org/apache/druid/client/cache/HybridCache.java b/server/src/main/java/org/apache/druid/client/cache/HybridCache.java index 65bf957a27e..509b19aa653 100644 --- a/server/src/main/java/org/apache/druid/client/cache/HybridCache.java +++ b/server/src/main/java/org/apache/druid/client/cache/HybridCache.java @@ -156,7 +156,7 @@ public class HybridCache implements Cache @LifecycleStop public void close() throws IOException { - CloseableUtils.closeBoth(level1, level2); + CloseableUtils.closeAll(level1, level2); } @Override diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 533389a02c3..b65d85644d4 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -31,15 +31,15 @@ import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.cache.PathChildrenCacheFactory; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.CloseQuietly; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -135,12 +135,11 @@ public class Announcer started = false; - Closer closer = Closer.create(); - for (PathChildrenCache cache : listeners.values()) { - closer.register(cache); - } try { - CloseQuietly.close(closer); + CloseableUtils.closeAll(listeners.values()); + } + catch (IOException e) { + throw new RuntimeException(e); } finally { pathChildrenCacheExecutor.shutdown(); @@ -413,9 +412,8 @@ public class Announcer try { cache.start(); } - catch (Exception e) { - CloseQuietly.close(cache); - throw new RuntimeException(e); + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, cache); } } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index f3c0afe326c..1aa77cec5fd 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -30,9 +30,9 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.DruidNode; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.util.concurrent.ExecutorService; @@ -40,6 +40,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; /** + * */ public class CuratorDruidLeaderSelector implements DruidLeaderSelector { @@ -100,8 +101,11 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); // give others a chance to become leader. - final LeaderLatch oldLatch = createNewLeaderLatchWithListener(); - CloseQuietly.close(oldLatch); + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); + leader = false; try { //Small delay before starting the latch so that others waiting are chosen to become leader. @@ -207,7 +211,8 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector if (!lifecycleLock.canStop()) { throw new ISE("can't stop."); } - CloseQuietly.close(leaderLatch.get()); + + CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); listenerExecutor.shutdownNow(); } } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index 2f60bd12ab8..4639a6c5177 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -155,7 +155,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide closer.registerAll(nodeRoleWatchers.values()); closer.registerAll(nodeDiscoverers); - CloseableUtils.closeBoth(closer, listenerExecutor::shutdownNow); + CloseableUtils.closeAll(closer, listenerExecutor::shutdownNow); } private static class NodeRoleWatcher implements DruidNodeDiscovery, Closeable @@ -213,7 +213,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide @Override public void close() throws IOException { - CloseableUtils.closeBoth(cache, cacheExecutor::shutdownNow); + CloseableUtils.closeAll(cache, cacheExecutor::shutdownNow); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index da626c1f768..ae6994134bb 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -68,6 +67,7 @@ import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; import java.io.Closeable; @@ -259,9 +259,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker ); return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); } - catch (RuntimeException e) { - CloseQuietly.close(segmentAndCloseable.rhs); - throw e; + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, segmentAndCloseable.rhs); } } ) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index 52e805a2361..88d6d6ede22 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -119,7 +119,7 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory log.warn("Could not close log file for %s. Creating new log file anyway.", currentDay) + ); + fileWriter = getFileWriter(); } } @@ -124,7 +133,7 @@ public class FileRequestLogger implements RequestLogger public void stop() { synchronized (lock) { - CloseQuietly.close(fileWriter); + CloseableUtils.closeAndWrapExceptions(fileWriter); } } diff --git a/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java b/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java index b5d96ebe2c2..aff62b55571 100644 --- a/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java +++ b/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java @@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.HashSet; import java.util.Map; @@ -82,10 +83,12 @@ public class HybridCacheTest } @Test - public void testSanity() + public void testSanity() throws IOException { - final MapCache l1 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); - final MapCache l2 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); + final ByteCountingLRUMap l1Map = new ByteCountingLRUMap(1024 * 1024); + final ByteCountingLRUMap l2Map = new ByteCountingLRUMap(1024 * 1024); + final MapCache l1 = new MapCache(l1Map); + final MapCache l2 = new MapCache(l2Map); HybridCache cache = new HybridCache(new HybridCacheConfig(), l1, l2); final Cache.NamedKey key1 = new Cache.NamedKey("a", HI); @@ -175,5 +178,10 @@ public class HybridCacheTest Assert.assertEquals(hits + 1, cache.getStats().getNumHits()); Assert.assertEquals(misses + 1, cache.getStats().getNumMisses()); } + + // test close + cache.close(); + Assert.assertEquals("l1 size after close()", 0, l1Map.size()); + Assert.assertEquals("l2 size after close()", 0, l2Map.size()); } }