diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java index c03e5f6d8e8..5abadb6c2ad 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java @@ -25,9 +25,8 @@ import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.utils.CloseableUtils; import java.io.Closeable; import java.io.IOException; @@ -80,7 +79,7 @@ public class JsonIterator implements CloseableIterator return false; } if (jp.getCurrentToken() == JsonToken.END_ARRAY) { - CloseQuietly.close(jp); + CloseableUtils.closeAndWrapExceptions(jp); return false; } return true; @@ -131,11 +130,6 @@ public class JsonIterator implements CloseableIterator @Override public void close() throws IOException { - Closer closer = Closer.create(); - if (jp != null) { - closer.register(jp); - } - closer.register(resourceCloser); - closer.close(); + CloseableUtils.closeAll(jp, resourceCloser); } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java b/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java deleted file mode 100644 index 8b4807b9045..00000000000 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/CloseQuietly.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.common.guava; - -import org.apache.druid.java.util.common.logger.Logger; - -import java.io.Closeable; -import java.io.IOException; - -/** - */ -public class CloseQuietly -{ - private static final Logger log = new Logger(CloseQuietly.class); - - public static void close(Closeable closeable) - { - if (closeable == null) { - return; - } - try { - closeable.close(); - } - catch (IOException e) { - log.error(e, "IOException thrown while closing Closeable."); - } - } -} diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 2a5c489cf72..749f3e64144 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryTimeoutException; +import org.apache.druid.utils.CloseableUtils; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; @@ -175,6 +176,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase new BaseSequence.IteratorMaker>() { private boolean shouldCancelOnCleanup = true; + @Override public Iterator make() { @@ -463,18 +465,19 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1); - LOG.debug("Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " - + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " - + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", - computedNumParallelTasks, - parallelism, - getPool().getActiveThreadCount(), - runningThreadCount, - submissionCount, - getPool().getQueuedTaskCount(), - getPool().getParallelism(), - getPool().getPoolSize(), - getPool().getStealCount() + LOG.debug( + "Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " + + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " + + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", + computedNumParallelTasks, + parallelism, + getPool().getActiveThreadCount(), + runningThreadCount, + submissionCount, + getPool().getQueuedTaskCount(), + getPool().getParallelism(), + getPool().getPoolSize(), + getPool().getStealCount() ); return computedNumParallelTasks; @@ -609,7 +612,10 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase // which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order // to prevent normal jitter in processing time from skewing the next yield value too far in any direction final long elapsedNanos = System.nanoTime() - start; - final double nextYieldAfter = Math.max((double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), 1.0); + final double nextYieldAfter = Math.max( + (double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), + 1.0 + ); final long recursionDepth = metricsAccumulator.getTaskCount(); final double cumulativeMovingAverage = (nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1); @@ -1376,6 +1382,6 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase { Closer closer = Closer.create(); closer.registerAll(cursors); - CloseQuietly.close(closer); + CloseableUtils.closeAndSuppressExceptions(closer, e -> LOG.warn(e, "Failed to close result cursors")); } } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java index 68555b002b5..279a6a8b51d 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java @@ -19,8 +19,8 @@ package org.apache.druid.java.util.http.client; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory; import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory; @@ -39,17 +39,12 @@ import org.jboss.netty.util.Timer; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.KeyManagementException; import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** + * */ public class HttpClientInit { @@ -110,11 +105,8 @@ public class HttpClientInit public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath, final String keyStorePassword) { - FileInputStream in = null; - try { + try (FileInputStream in = new FileInputStream(keyStorePath)) { final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); - - in = new FileInputStream(keyStorePath); ks.load(in, keyStorePassword.toCharArray()); in.close(); @@ -125,27 +117,10 @@ public class HttpClientInit return sslContext; } - catch (CertificateException e) { + catch (Exception e) { + Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - catch (KeyStoreException e) { - throw new RuntimeException(e); - } - catch (KeyManagementException e) { - throw new RuntimeException(e); - } - catch (FileNotFoundException e) { - throw new RuntimeException(e); - } - catch (IOException e) { - throw new RuntimeException(e); - } - finally { - CloseQuietly.close(in); - } } private static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer, int bossPoolSize, int workerPoolSize) diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java index e395a717239..e1094ba8908 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.http.client.response; import com.google.common.io.ByteSource; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; @@ -57,19 +56,17 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response, TrafficCop trafficCop) { - ChannelBufferInputStream channelStream = null; - try { - channelStream = new ChannelBufferInputStream(response.getContent()); + try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(response.getContent())) { queue.put(channelStream); } + catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { log.error(e, "Queue appending interrupted"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } - finally { - CloseQuietly.close(channelStream); - } byteCount.addAndGet(response.getContent().readableBytes()); return ClientResponse.finished( new SequenceInputStream( @@ -112,21 +109,19 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler 0) { - ChannelBufferInputStream channelStream = null; - try { - channelStream = new ChannelBufferInputStream(channelBuffer); + try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(channelBuffer)) { queue.put(channelStream); // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong log.debug("Added stream. Queue length %d", queue.size()); } + catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { log.warn(e, "Thread interrupted while adding to queue"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } - finally { - CloseQuietly.close(channelStream); - } byteCount.addAndGet(bytes); } else { log.debug("Skipping zero length chunk"); diff --git a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java index 768abbe86dc..6a0a143b13e 100644 --- a/core/src/main/java/org/apache/druid/utils/CloseableUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CloseableUtils.java @@ -19,11 +19,19 @@ package org.apache.druid.utils; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.io.Closer; + +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; /** - * Methods in this class could have belonged to {@link org.apache.druid.java.util.common.io.Closer}, but not editing + * Methods in this class could have belonged to {@link Closer}, but not editing * that class to keep its source close to Guava source. */ public final class CloseableUtils @@ -34,15 +42,127 @@ public final class CloseableUtils * first.close(); * second.close(); * - * to have safety of {@link org.apache.druid.java.util.common.io.Closer}, but without associated boilerplate code + * to have safety of {@link Closer}, but without associated boilerplate code * of creating a Closer and registering objects in it. */ - public static void closeBoth(Closeable first, Closeable second) throws IOException + public static void closeAll(Closeable first, Closeable... others) throws IOException { - //noinspection EmptyTryBlock - try (Closeable ignore1 = second; - Closeable ignore2 = first) { - // piggy-back try-with-resources semantics + final List closeables = new ArrayList<>(others.length + 1); + closeables.add(first); + closeables.addAll(Arrays.asList(others)); + closeAll(closeables); + } + + /** + * Close all the provided {@param closeables}, from first to last. + */ + public static void closeAll(Iterable closeables) throws IOException + { + final Closer closer = Closer.create(); + + // Register in reverse order, so we close from first to last. + closer.registerAll(Lists.reverse(Lists.newArrayList(closeables))); + closer.close(); + } + + /** + * Like {@link Closeable#close()}, but guaranteed to throw {@param caught}. Will add any exceptions encountered + * during closing to {@param caught} using {@link Throwable#addSuppressed(Throwable)}. + * + * Should be used like {@code throw CloseableUtils.closeInCatch(e, closeable)}. (The "throw" is important for + * reachability detection.) + */ + public static RuntimeException closeInCatch( + final E caught, + @Nullable final Closeable closeable + ) throws E + { + if (caught == null) { + // Incorrect usage; throw an exception with an error message that may be useful to the programmer. + final RuntimeException e1 = new IllegalStateException("Must be called with non-null caught exception"); + + if (closeable != null) { + try { + closeable.close(); + } + catch (Throwable e2) { + e1.addSuppressed(e2); + } + } + + throw e1; + } + + if (closeable != null) { + try { + closeable.close(); + } + catch (Throwable e) { + caught.addSuppressed(e); + } + } + + throw caught; + } + + /** + * Like {@link #closeInCatch} but wraps {@param caught} in a {@link RuntimeException} if it is a checked exception. + */ + public static RuntimeException closeAndWrapInCatch( + final E caught, + @Nullable final Closeable closeable + ) + { + try { + throw closeInCatch(caught, closeable); + } + catch (RuntimeException | Error e) { + // Unchecked exception. + throw e; + } + catch (Throwable e) { + // Checked exception; must wrap. + throw new RuntimeException(e); + } + } + + /** + * Like {@link Closeable#close()} but wraps IOExceptions in RuntimeExceptions. + */ + public static void closeAndWrapExceptions(@Nullable final Closeable closeable) + { + if (closeable == null) { + return; + } + + try { + closeable.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Like {@link Closeable#close()} but sends any exceptions to the provided Consumer and then returns quietly. + * + * If the Consumer throws an exception, that exception is thrown by this method. So if your intent is to chomp + * exceptions, you should avoid writing a Consumer that might throw an exception. + */ + public static void closeAndSuppressExceptions( + @Nullable final Closeable closeable, + final Consumer chomper + ) + { + if (closeable == null) { + return; + } + + try { + closeable.close(); + } + catch (Throwable e) { + chomper.accept(e); } } diff --git a/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java new file mode 100644 index 00000000000..57fa61079d2 --- /dev/null +++ b/core/src/test/java/org/apache/druid/utils/CloseableUtilsTest.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.utils; + +import com.google.common.base.Throwables; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableCauseMatcher; +import org.junit.internal.matchers.ThrowableMessageMatcher; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class CloseableUtilsTest +{ + private final TestCloseable quietCloseable = new TestCloseable(null); + private final TestCloseable quietCloseable2 = new TestCloseable(null); + private final TestCloseable ioExceptionCloseable = new TestCloseable(new IOException()); + private final TestCloseable runtimeExceptionCloseable = new TestCloseable(new IllegalArgumentException()); + private final TestCloseable assertionErrorCloseable = new TestCloseable(new AssertionError()); + + // For closeAndSuppressException tests. + private final AtomicLong chomped = new AtomicLong(); + private final Consumer chomper = e -> chomped.incrementAndGet(); + + @Test + public void test_closeAll_array_quiet() throws IOException + { + CloseableUtils.closeAll(quietCloseable, null, quietCloseable2); + assertClosed(quietCloseable, quietCloseable2); + } + + @Test + public void test_closeAll_list_quiet() throws IOException + { + CloseableUtils.closeAll(Arrays.asList(quietCloseable, null, quietCloseable2)); + assertClosed(quietCloseable, quietCloseable2); + } + + @Test + public void test_closeAll_array_loud() + { + Exception e = null; + try { + CloseableUtils.closeAll(quietCloseable, null, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + } + catch (Exception e2) { + e = e2; + } + + assertClosed(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAll_list_loud() + { + Exception e = null; + try { + CloseableUtils.closeAll( + Arrays.asList( + quietCloseable, + null, + ioExceptionCloseable, + quietCloseable2, + runtimeExceptionCloseable + ) + ); + } + catch (Exception e2) { + e = e2; + } + + assertClosed(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndWrapExceptions_null() + { + CloseableUtils.closeAndWrapExceptions(null); + // Nothing happens. + } + + @Test + public void test_closeAndWrapExceptions_quiet() + { + CloseableUtils.closeAndWrapExceptions(quietCloseable); + assertClosed(quietCloseable); + } + + @Test + public void test_closeAndWrapExceptions_ioException() + { + Exception e = null; + try { + CloseableUtils.closeAndWrapExceptions(ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + assertClosed(ioExceptionCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + } + + @Test + public void test_closeAndWrapExceptions_runtimeException() + { + Exception e = null; + try { + CloseableUtils.closeAndWrapExceptions(runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + assertClosed(runtimeExceptionCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndWrapExceptions_assertionError() + { + Throwable e = null; + try { + CloseableUtils.closeAndWrapExceptions(assertionErrorCloseable); + } + catch (Throwable e1) { + e = e1; + } + + assertClosed(assertionErrorCloseable); + Assert.assertThat(e, CoreMatchers.instanceOf(AssertionError.class)); + } + + @Test + public void test_closeAndSuppressExceptions_null() + { + CloseableUtils.closeAndSuppressExceptions(null, chomper); + Assert.assertEquals(0, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_quiet() + { + CloseableUtils.closeAndSuppressExceptions(quietCloseable, chomper); + assertClosed(quietCloseable); + Assert.assertEquals(0, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_ioException() + { + CloseableUtils.closeAndSuppressExceptions(ioExceptionCloseable, chomper); + assertClosed(ioExceptionCloseable); + Assert.assertEquals(1, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_runtimeException() + { + CloseableUtils.closeAndSuppressExceptions(runtimeExceptionCloseable, chomper); + assertClosed(runtimeExceptionCloseable); + Assert.assertEquals(1, chomped.get()); + } + + @Test + public void test_closeAndSuppressExceptions_assertionError() + { + CloseableUtils.closeAndSuppressExceptions(assertionErrorCloseable, chomper); + assertClosed(assertionErrorCloseable); + Assert.assertEquals(1, chomped.get()); + } + + @Test + public void test_closeInCatch_improper() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(null, quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Must be called with non-null caught exception")) + ); + } + + @Test + public void test_closeInCatch_quiet() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + } + + @Test + public void test_closeInCatch_ioException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new IOException("this one was caught"), ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(ioExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IOException.class)); + } + + @Test + public void test_closeInCatch_runtimeException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(runtimeExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndWrapInCatch_improper() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(null, quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Must be called with non-null caught exception")) + ); + } + + @Test + public void test_closeAndWrapInCatch_quiet() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), quietCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(quietCloseable.isClosed()); + + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + } + + @Test + public void test_closeAndWrapInCatch_ioException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new IOException("this one was caught"), ioExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(ioExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("java.io.IOException: this one was caught")) + ); + Assert.assertThat(e, ThrowableCauseMatcher.hasCause(CoreMatchers.instanceOf(IOException.class))); + Assert.assertThat( + e, + ThrowableCauseMatcher.hasCause( + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ) + ); + + // Second exception + Assert.assertEquals(1, e.getCause().getSuppressed().length); + Assert.assertThat(e.getCause().getSuppressed()[0], CoreMatchers.instanceOf(IOException.class)); + } + + @Test + public void test_closeAndWrapInCatch_runtimeException() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), runtimeExceptionCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(runtimeExceptionCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class)); + } + + @Test + public void test_closeAndWrapInCatch_assertionError() + { + Exception e = null; + try { + //noinspection ThrowableNotThrown + CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), assertionErrorCloseable); + } + catch (Exception e1) { + e = e1; + } + + Assert.assertTrue(assertionErrorCloseable.isClosed()); + + // First exception + Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class)); + Assert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught")) + ); + + // Second exception + Assert.assertEquals(1, e.getSuppressed().length); + Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(AssertionError.class)); + } + + private static void assertClosed(final TestCloseable... closeables) + { + for (TestCloseable closeable : closeables) { + Assert.assertTrue(closeable.isClosed()); + } + } + + private static class TestCloseable implements Closeable + { + @Nullable + private final Throwable e; + private final AtomicBoolean closed = new AtomicBoolean(false); + + TestCloseable(@Nullable Throwable e) + { + this.e = e; + } + + @Override + public void close() throws IOException + { + closed.set(true); + if (e != null) { + Throwables.propagateIfInstanceOf(e, IOException.class); + throw Throwables.propagate(e); + } + } + + public boolean isClosed() + { + return closed.get(); + } + } +} diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java index 2cf5a8d581b..e8b304218ba 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java @@ -25,9 +25,9 @@ import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.DruidNode; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; @@ -45,7 +45,13 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "incremented but in single thread") private volatile int term = 0; - public K8sDruidLeaderSelector(@Self DruidNode self, String lockResourceName, String lockResourceNamespace, K8sDiscoveryConfig discoveryConfig, K8sLeaderElectorFactory k8sLeaderElectorFactory) + public K8sDruidLeaderSelector( + @Self DruidNode self, + String lockResourceName, + String lockResourceNamespace, + K8sDiscoveryConfig discoveryConfig, + K8sLeaderElectorFactory k8sLeaderElectorFactory + ) { this.leaderLatch = new LeaderElectorAsyncWrapper( self.getServiceScheme() + "://" + self.getHostAndPortToUse(), @@ -72,8 +78,7 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector } catch (Throwable ex) { LOGGER.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); - - CloseQuietly.close(leaderLatch); + closeLeaderLatchQuietly(); leader = false; //Exit and Kubernetes would simply create a new replacement pod. System.exit(1); @@ -147,6 +152,15 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector if (!lifecycleLock.canStop()) { throw new ISE("can't stop."); } - CloseQuietly.close(leaderLatch); + + closeLeaderLatchQuietly(); + } + + private void closeLeaderLatchQuietly() + { + CloseableUtils.closeAndSuppressExceptions( + leaderLatch, + e -> LOGGER.warn("Exception caught while cleaning up leader latch") + ); } } diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java index 08dfafd1e51..6aa271b6d90 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java @@ -32,11 +32,11 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.DruidNode; +import org.apache.druid.utils.CloseableUtils; import java.io.Closeable; import java.net.SocketTimeoutException; @@ -340,7 +340,8 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider try { LOGGER.info("Stopping NodeRoleWatcher for [%s]...", nodeRole); - CloseQuietly.close(watchRef.getAndSet(STOP_MARKER)); + // STOP_MARKER cannot throw exceptions on close(), so this is OK. + CloseableUtils.closeAndSuppressExceptions(STOP_MARKER, e -> {}); watchExecutor.shutdownNow(); if (!watchExecutor.awaitTermination(15, TimeUnit.SECONDS)) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 4e1c5e7a275..30382eaefbd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -61,7 +61,6 @@ import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.NoopQueryRunner; @@ -86,6 +85,7 @@ import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import org.apache.druid.segment.realtime.plumber.Committers; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.apache.druid.utils.CloseableUtils; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import javax.servlet.http.HttpServletRequest; @@ -428,9 +428,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements finally { toolbox.getChatHandlerProvider().unregister(getId()); - CloseQuietly.close(firehose); + CloseableUtils.closeAndSuppressExceptions(firehose, e -> log.warn("Failed to close Firehose")); appenderator.close(); - CloseQuietly.close(driver); + CloseableUtils.closeAndSuppressExceptions(driver, e -> log.warn("Failed to close AppenderatorDriver")); toolbox.removeMonitor(metricsMonitor); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index 6a5fc698daf..85729d7ab8b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; @@ -67,6 +66,7 @@ import org.apache.druid.segment.realtime.plumber.RealtimePlumberSchool; import org.apache.druid.segment.realtime.plumber.VersioningPolicy; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -302,7 +302,11 @@ public class RealtimeIndexTask extends AbstractTask { try { // Side effect: Calling getVersion causes a lock to be acquired - final TimeChunkLockAcquireAction action = new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs); + final TimeChunkLockAcquireAction action = new TimeChunkLockAcquireAction( + TaskLockType.EXCLUSIVE, + interval, + lockTimeoutMs + ); final TaskLock lock = Preconditions.checkNotNull( toolbox.getTaskActionClient().submit(action), "Cannot acquire a lock for interval[%s]", @@ -471,7 +475,7 @@ public class RealtimeIndexTask extends AbstractTask } finally { if (firehose != null) { - CloseQuietly.close(firehose); + CloseableUtils.closeAndSuppressExceptions(firehose, e -> log.warn("Failed to close Firehose")); } toolbox.removeMonitor(metricsMonitor); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index f48d65b4c70..a4d1c2e793e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.filter.DimFilter; @@ -58,6 +57,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; +import org.apache.druid.utils.CloseableUtils; import org.apache.druid.utils.CollectionUtils; import java.io.File; @@ -208,10 +208,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader( () -> Sequences.withBaggage( memoizedSupplier.get().results(dimsToInclude), - closeOnSequenceRead ? () -> CloseQuietly.close(memoizedSupplier.get()) : () -> {} + closeOnSequenceRead + ? () -> CloseableUtils.closeAndWrapExceptions(memoizedSupplier.get()) + : () -> {} ) ), subtotalQuery, null ); } - catch (Exception ex) { - CloseQuietly.close(baseResultsSupplier.get()); - throw ex; + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, baseResultsSupplier.get()); } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 78a369a2b89..413cc8889fc 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -27,7 +27,6 @@ import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.GenericQueryMetricsFactory; @@ -39,6 +38,7 @@ import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.utils.CloseableUtils; import java.util.List; import java.util.Map; @@ -108,7 +108,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest @Override protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + if (holder != null) { + holder.close(); + } holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); // asLongBuffer() makes the longBuffer's position = 0 @@ -190,7 +191,9 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier protected void loadBuffer(int bufferNum) { - CloseQuietly.close(holder); + if (holder != null) { + holder.close(); + } holder = singleThreadedLongBuffers.get(bufferNum); buffer = holder.get(); currBufferNum = bufferNum; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index d9b3cf97085..e685721eb88 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -134,7 +133,11 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier holder; - /** buffer's position must be 0 */ + /** + * buffer's position must be 0 + */ IntBuffer buffer; @Override @@ -317,7 +322,9 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier implements CloseableIndexed, Serializer headerOut.writeInt(Ints.checkedCast(valuesOut.size())); if (prevVal instanceof Closeable) { - CloseQuietly.close((Closeable) prevVal); + CloseableUtils.closeAndWrapExceptions((Closeable) prevVal); } prevVal = next; } while (objects.hasNext()); if (prevVal instanceof Closeable) { - CloseQuietly.close((Closeable) prevVal); + CloseableUtils.closeAndWrapExceptions((Closeable) prevVal); } } catch (IOException e) { diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index 34ac51c2b54..7ade12eacae 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -20,14 +20,15 @@ package org.apache.druid.segment.join; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.filter.Filter; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -43,6 +44,8 @@ import java.util.Optional; */ public class HashJoinSegment implements SegmentReference { + private static final Logger log = new Logger(HashJoinSegment.class); + private final SegmentReference baseSegment; private final Filter baseFilter; private final List clauses; @@ -131,14 +134,16 @@ public class HashJoinSegment implements SegmentReference }).orElse(true); } if (acquireFailed) { - CloseQuietly.close(closer); + CloseableUtils.closeAndWrapExceptions(closer); return Optional.empty(); } else { return Optional.of(closer); } } - catch (Exception ex) { - CloseQuietly.close(closer); + catch (Throwable e) { + // acquireReferences is not permitted to throw exceptions. + CloseableUtils.closeAndSuppressExceptions(closer, e::addSuppressed); + log.warn(e, "Exception encountered while trying to acquire reference"); return Optional.empty(); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 1ed990b5f97..b8bc3f051bb 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -41,7 +41,6 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; @@ -85,6 +84,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.junit.rules.TemporaryFolder; import java.io.Closeable; @@ -712,7 +712,7 @@ public class AggregationTestHelper implements Closeable } finally { for (Segment segment : segments) { - CloseQuietly.close(segment); + CloseableUtils.closeAndWrapExceptions(segment); } } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java index 30419b04740..2e516cebf63 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java @@ -28,7 +28,6 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.NoopInputRowParser; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Druids; import org.apache.druid.query.Result; @@ -41,6 +40,7 @@ import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.After; @@ -133,7 +133,7 @@ public class StringColumnAggregationTest { if (segments != null) { for (Segment seg : segments) { - CloseQuietly.close(seg); + CloseableUtils.closeAndWrapExceptions(seg); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java index 43ea5eb0863..52dca67257c 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java @@ -26,7 +26,6 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -35,6 +34,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -226,7 +226,7 @@ public class CompressedColumnarIntsSerializerTest for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception @@ -269,7 +269,7 @@ public class CompressedColumnarIntsSerializerTest for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); mapper.close(); } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java index a857eb4256c..01c9cc26dca 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java @@ -22,9 +22,9 @@ package org.apache.druid.segment.data; import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.CompressedPools; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -58,7 +58,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest public void setUp() { closer = Closer.create(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); columnarInts = null; supplier = null; vals = null; @@ -68,12 +68,12 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest public void tearDown() throws Exception { closer.close(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } private void setupSimple(final int chunkSize) { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; @@ -97,7 +97,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest private void makeWithSerde(final int chunkSize) throws IOException { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedColumnarIntsSupplier theSupplier = CompressedColumnarIntsSupplier.fromIntBuffer( @@ -271,7 +271,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest stopLatch.await(); } finally { - CloseQuietly.close(columnarInts2); + CloseableUtils.closeAndWrapExceptions(columnarInts2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java index 284aea33b75..338897be752 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Doubles; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -321,7 +321,7 @@ public class CompressedDoublesSerdeTest stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java index d11c089abf2..c831170f48f 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Floats; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -78,8 +78,29 @@ public class CompressedFloatsSerdeTest private final float[] values2 = {13.2f, 6.1f, 0.001f, 123f, 12572f, 123.1f, 784.4f, 6892.8634f, 8.341111f}; private final float[] values3 = {0.001f, 0.001f, 0.001f, 0.001f, 0.001f, 100f, 100f, 100f, 100f, 100f}; private final float[] values4 = {0f, 0f, 0f, 0f, 0.01f, 0f, 0f, 0f, 21.22f, 0f, 0f, 0f, 0f, 0f, 0f}; - private final float[] values5 = {123.16f, 1.12f, 62.00f, 462.12f, 517.71f, 56.54f, 971.32f, 824.22f, 472.12f, 625.26f}; - private final float[] values6 = {1000000f, 1000001f, 1000002f, 1000003f, 1000004f, 1000005f, 1000006f, 1000007f, 1000008f}; + private final float[] values5 = { + 123.16f, + 1.12f, + 62.00f, + 462.12f, + 517.71f, + 56.54f, + 971.32f, + 824.22f, + 472.12f, + 625.26f + }; + private final float[] values6 = { + 1000000f, + 1000001f, + 1000002f, + 1000003f, + 1000004f, + 1000005f, + 1000006f, + 1000007f, + 1000008f + }; private final float[] values7 = { Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, 12378.5734f, -12718243.7496f, -93653653.1f, 12743153.385534f, 21431.414538f, 65487435436632.123f, -43734526234564.65f @@ -321,7 +342,7 @@ public class CompressedFloatsSerdeTest stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java index f0effe78a2d..6d6fe4549f9 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java @@ -23,10 +23,10 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -356,7 +356,7 @@ public class CompressedLongsSerdeTest stopLatch.await(); } finally { - CloseQuietly.close(indexed2); + CloseableUtils.closeAndWrapExceptions(indexed2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java index 5ba842013bb..bb5868f6df2 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.commons.io.IOUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -34,6 +33,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -154,7 +154,7 @@ public class CompressedVSizeColumnarIntsSerializerTest for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); } @Test @@ -268,8 +268,7 @@ public class CompressedVSizeColumnarIntsSerializerTest for (int i = 0; i < vals.length; ++i) { Assert.assertEquals(vals[i], columnarInts.get(i)); } - CloseQuietly.close(columnarInts); - mapper.close(); + CloseableUtils.closeAll(columnarInts, mapper); } @Test diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java index f113dc3510a..48a443ef661 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplierTest.java @@ -27,9 +27,9 @@ import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.CompressedPools; +import org.apache.druid.utils.CloseableUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -67,7 +67,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy ); } - private static final int[] MAX_VALUES = new int[] {0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; + private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; public CompressedVSizeColumnarIntsSupplierTest(CompressionStrategy compressionStrategy, ByteOrder byteOrder) { @@ -86,7 +86,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy public void setUp() { closer = Closer.create(); - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); columnarInts = null; supplier = null; vals = null; @@ -95,13 +95,12 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy @After public void tearDown() throws Exception { - CloseQuietly.close(columnarInts); - closer.close(); + CloseableUtils.closeAll(columnarInts, closer); } private void setupSimple(final int chunkSize) { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; @@ -126,7 +125,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy private void makeWithSerde(final int chunkSize) throws IOException { - CloseQuietly.close(columnarInts); + CloseableUtils.closeAndWrapExceptions(columnarInts); ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedVSizeColumnarIntsSupplier theSupplier = CompressedVSizeColumnarIntsSupplier.fromList( @@ -212,8 +211,14 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy public void testmaxIntsInBuffer() { Assert.assertEquals(CompressedPools.BUFFER_SIZE, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(1)); - Assert.assertEquals(CompressedPools.BUFFER_SIZE / 2, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(2)); - Assert.assertEquals(CompressedPools.BUFFER_SIZE / 4, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(4)); + Assert.assertEquals( + CompressedPools.BUFFER_SIZE / 2, + CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(2) + ); + Assert.assertEquals( + CompressedPools.BUFFER_SIZE / 4, + CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(4) + ); Assert.assertEquals(CompressedPools.BUFFER_SIZE, 0x10000); // nearest power of 2 is 2^14 Assert.assertEquals(1 << 14, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(3)); @@ -330,7 +335,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy stopLatch.await(); } finally { - CloseQuietly.close(columnarInts2); + CloseableUtils.closeAndWrapExceptions(columnarInts2); } if (failureHappened.get()) { diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java index de6485fff1f..6877416e764 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -34,6 +33,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; +import org.apache.druid.utils.CloseableUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -322,12 +322,17 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest Assert.assertEquals(subVals.get(j), vals.get(i)[j]); } } - CloseQuietly.close(columnarMultiInts); - mapper.close(); + CloseableUtils.closeAll(columnarMultiInts, mapper); } } - private void generateV2SerializedSizeAndData(long numRows, int maxValue, int maxValuesPerRow, int offsetChunkFactor, int valueChunkFactor) throws Exception + private void generateV2SerializedSizeAndData( + long numRows, + int maxValue, + int maxValuesPerRow, + int offsetChunkFactor, + int valueChunkFactor + ) throws Exception { File tmpDirectory = FileUtils.createTempDir(StringUtils.format( "CompressedVSizeIndexedV3WriterTest_%d_%d", @@ -395,8 +400,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest Assert.assertEquals(subVals.get(j), expected[j]); } } - CloseQuietly.close(columnarMultiInts); - mapper.close(); + CloseableUtils.closeAll(columnarMultiInts, mapper); } } } diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 75a2a4507e4..ead9e2d3274 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; @@ -58,6 +57,7 @@ import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.server.QueryResource; +import org.apache.druid.utils.CloseableUtils; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -518,7 +518,7 @@ public class DirectDruidClient implements QueryRunner @Override public void cleanup(JsonParserIterator iterFromMake) { - CloseQuietly.close(iterFromMake); + CloseableUtils.closeAndWrapExceptions(iterFromMake); } } ); diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java index b7b4d631e42..35532276e93 100644 --- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java +++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryCapacityExceededException; @@ -35,6 +34,7 @@ import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.io.Closeable; @@ -96,7 +96,7 @@ public class JsonParserIterator implements Iterator, Closeable return false; } if (jp.getCurrentToken() == JsonToken.END_ARRAY) { - CloseQuietly.close(jp); + CloseableUtils.closeAndWrapExceptions(jp); return false; } diff --git a/server/src/main/java/org/apache/druid/client/cache/HybridCache.java b/server/src/main/java/org/apache/druid/client/cache/HybridCache.java index 65bf957a27e..509b19aa653 100644 --- a/server/src/main/java/org/apache/druid/client/cache/HybridCache.java +++ b/server/src/main/java/org/apache/druid/client/cache/HybridCache.java @@ -156,7 +156,7 @@ public class HybridCache implements Cache @LifecycleStop public void close() throws IOException { - CloseableUtils.closeBoth(level1, level2); + CloseableUtils.closeAll(level1, level2); } @Override diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 533389a02c3..b65d85644d4 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -31,15 +31,15 @@ import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.cache.PathChildrenCacheFactory; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.CloseQuietly; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -135,12 +135,11 @@ public class Announcer started = false; - Closer closer = Closer.create(); - for (PathChildrenCache cache : listeners.values()) { - closer.register(cache); - } try { - CloseQuietly.close(closer); + CloseableUtils.closeAll(listeners.values()); + } + catch (IOException e) { + throw new RuntimeException(e); } finally { pathChildrenCacheExecutor.shutdown(); @@ -413,9 +412,8 @@ public class Announcer try { cache.start(); } - catch (Exception e) { - CloseQuietly.close(cache); - throw new RuntimeException(e); + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, cache); } } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index f3c0afe326c..1aa77cec5fd 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -30,9 +30,9 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.DruidNode; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.util.concurrent.ExecutorService; @@ -40,6 +40,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; /** + * */ public class CuratorDruidLeaderSelector implements DruidLeaderSelector { @@ -100,8 +101,11 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); // give others a chance to become leader. - final LeaderLatch oldLatch = createNewLeaderLatchWithListener(); - CloseQuietly.close(oldLatch); + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); + leader = false; try { //Small delay before starting the latch so that others waiting are chosen to become leader. @@ -207,7 +211,8 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector if (!lifecycleLock.canStop()) { throw new ISE("can't stop."); } - CloseQuietly.close(leaderLatch.get()); + + CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); listenerExecutor.shutdownNow(); } } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index 2f60bd12ab8..4639a6c5177 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -155,7 +155,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide closer.registerAll(nodeRoleWatchers.values()); closer.registerAll(nodeDiscoverers); - CloseableUtils.closeBoth(closer, listenerExecutor::shutdownNow); + CloseableUtils.closeAll(closer, listenerExecutor::shutdownNow); } private static class NodeRoleWatcher implements DruidNodeDiscovery, Closeable @@ -213,7 +213,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide @Override public void close() throws IOException { - CloseableUtils.closeBoth(cache, cacheExecutor::shutdownNow); + CloseableUtils.closeAll(cache, cacheExecutor::shutdownNow); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index da626c1f768..ae6994134bb 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -68,6 +67,7 @@ import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; import java.io.Closeable; @@ -259,9 +259,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker ); return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); } - catch (RuntimeException e) { - CloseQuietly.close(segmentAndCloseable.rhs); - throw e; + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, segmentAndCloseable.rhs); } } ) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index 52e805a2361..88d6d6ede22 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -119,7 +119,7 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory log.warn("Could not close log file for %s. Creating new log file anyway.", currentDay) + ); + fileWriter = getFileWriter(); } } @@ -124,7 +133,7 @@ public class FileRequestLogger implements RequestLogger public void stop() { synchronized (lock) { - CloseQuietly.close(fileWriter); + CloseableUtils.closeAndWrapExceptions(fileWriter); } } diff --git a/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java b/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java index b5d96ebe2c2..aff62b55571 100644 --- a/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java +++ b/server/src/test/java/org/apache/druid/client/cache/HybridCacheTest.java @@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.HashSet; import java.util.Map; @@ -82,10 +83,12 @@ public class HybridCacheTest } @Test - public void testSanity() + public void testSanity() throws IOException { - final MapCache l1 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); - final MapCache l2 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); + final ByteCountingLRUMap l1Map = new ByteCountingLRUMap(1024 * 1024); + final ByteCountingLRUMap l2Map = new ByteCountingLRUMap(1024 * 1024); + final MapCache l1 = new MapCache(l1Map); + final MapCache l2 = new MapCache(l2Map); HybridCache cache = new HybridCache(new HybridCacheConfig(), l1, l2); final Cache.NamedKey key1 = new Cache.NamedKey("a", HI); @@ -175,5 +178,10 @@ public class HybridCacheTest Assert.assertEquals(hits + 1, cache.getStats().getNumHits()); Assert.assertEquals(misses + 1, cache.getStats().getNumMisses()); } + + // test close + cache.close(); + Assert.assertEquals("l1 size after close()", 0, l1Map.size()); + Assert.assertEquals("l2 size after close()", 0, l2Map.size()); } }