Remove CloseQuietly and migrate its usages to other methods. (#10247)

* Remove CloseQuietly and migrate its usages to other methods.

These other methods include:

1) New method CloseableUtils.closeAndWrapExceptions, which wraps IOExceptions
   in RuntimeExceptions for callers that just want to avoid dealing with
   checked exceptions. Most usages were migrated to this method, because it
   looks like they were mainly attempts to avoid declaring a throws clause,
   and perhaps were unintentionally suppressing IOExceptions.
2) New method CloseableUtils.closeInCatch, designed to properly close something
   in a catch block without losing exceptions. Some usages from catch blocks
   were migrated here, when it seemed that they were intended to avoid checked
   exception handling, and did not really intend to also suppress IOExceptions.
3) New method CloseableUtils.closeAndSuppressExceptions, which sends all
   exceptions to a "chomper" that consumes them. Nothing is thrown or returned.
   The behavior is slightly different: with this method, _all_ exceptions are
   suppressed, not just IOExceptions. Calls that seemed like they had good
   reason to suppress exceptions were migrated here.
4) Some calls were migrated to try-with-resources, in cases where it appeared
   that CloseQuietly was being used to avoid throwing an exception in a finally
   block.

🎵 You don't have to go home, but you can't stay here... 🎵

* Remove unused import.

* Fix up various issues.

* Adjustments to tests.

* Fix null handling.

* Additional test.

* Adjustments from review.

* Fixup style stuff.

* Fix NPE caused by holder starting out null.

* Fix spelling.

* Chomp Throwables too.
This commit is contained in:
Gian Merlino 2021-10-23 17:03:21 -07:00 committed by GitHub
parent 44a7b09190
commit 98ecbb21cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 869 additions and 285 deletions

View File

@ -25,9 +25,8 @@ import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.utils.CloseableUtils;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -80,7 +79,7 @@ public class JsonIterator<T> implements CloseableIterator<T>
return false; return false;
} }
if (jp.getCurrentToken() == JsonToken.END_ARRAY) { if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp); CloseableUtils.closeAndWrapExceptions(jp);
return false; return false;
} }
return true; return true;
@ -131,11 +130,6 @@ public class JsonIterator<T> implements CloseableIterator<T>
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Closer closer = Closer.create(); CloseableUtils.closeAll(jp, resourceCloser);
if (jp != null) {
closer.register(jp);
}
closer.register(resourceCloser);
closer.close();
} }
} }

View File

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.guava;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.Closeable;
import java.io.IOException;
/**
*/
public class CloseQuietly
{
private static final Logger log = new Logger(CloseQuietly.class);
public static void close(Closeable closeable)
{
if (closeable == null) {
return;
}
try {
closeable.close();
}
catch (IOException e) {
log.error(e, "IOException thrown while closing Closeable.");
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.utils.CloseableUtils;
import org.apache.druid.utils.JvmUtils; import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -175,6 +176,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
new BaseSequence.IteratorMaker<T, Iterator<T>>() new BaseSequence.IteratorMaker<T, Iterator<T>>()
{ {
private boolean shouldCancelOnCleanup = true; private boolean shouldCancelOnCleanup = true;
@Override @Override
public Iterator<T> make() public Iterator<T> make()
{ {
@ -463,7 +465,8 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1); final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1);
LOG.debug("Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " LOG.debug(
"Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] "
+ "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] "
+ "pool parallelism: [%s] pool size: [%s] steal count: [%s]", + "pool parallelism: [%s] pool size: [%s] steal count: [%s]",
computedNumParallelTasks, computedNumParallelTasks,
@ -609,7 +612,10 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
// which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order // which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order
// to prevent normal jitter in processing time from skewing the next yield value too far in any direction // to prevent normal jitter in processing time from skewing the next yield value too far in any direction
final long elapsedNanos = System.nanoTime() - start; final long elapsedNanos = System.nanoTime() - start;
final double nextYieldAfter = Math.max((double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), 1.0); final double nextYieldAfter = Math.max(
(double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos),
1.0
);
final long recursionDepth = metricsAccumulator.getTaskCount(); final long recursionDepth = metricsAccumulator.getTaskCount();
final double cumulativeMovingAverage = final double cumulativeMovingAverage =
(nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1); (nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1);
@ -1376,6 +1382,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
{ {
Closer closer = Closer.create(); Closer closer = Closer.create();
closer.registerAll(cursors); closer.registerAll(cursors);
CloseQuietly.close(closer); CloseableUtils.closeAndSuppressExceptions(closer, e -> LOG.warn(e, "Failed to close result cursors"));
} }
} }

View File

@ -19,8 +19,8 @@
package org.apache.druid.java.util.http.client; package org.apache.druid.java.util.http.client;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory; import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory;
import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory; import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory;
@ -39,17 +39,12 @@ import org.jboss.netty.util.Timer;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore; import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
*
*/ */
public class HttpClientInit public class HttpClientInit
{ {
@ -110,11 +105,8 @@ public class HttpClientInit
public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath, final String keyStorePassword) public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath, final String keyStorePassword)
{ {
FileInputStream in = null; try (FileInputStream in = new FileInputStream(keyStorePath)) {
try {
final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
in = new FileInputStream(keyStorePath);
ks.load(in, keyStorePassword.toCharArray()); ks.load(in, keyStorePassword.toCharArray());
in.close(); in.close();
@ -125,27 +117,10 @@ public class HttpClientInit
return sslContext; return sslContext;
} }
catch (CertificateException e) { catch (Exception e) {
Throwables.propagateIfPossible(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
catch (KeyStoreException e) {
throw new RuntimeException(e);
}
catch (KeyManagementException e) {
throw new RuntimeException(e);
}
catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
finally {
CloseQuietly.close(in);
}
} }
private static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer, int bossPoolSize, int workerPoolSize) private static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer, int bossPoolSize, int workerPoolSize)

View File

@ -20,7 +20,6 @@
package org.apache.druid.java.util.http.client.response; package org.apache.druid.java.util.http.client.response;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.buffer.ChannelBufferInputStream;
@ -57,19 +56,17 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler<I
@Override @Override
public ClientResponse<InputStream> handleResponse(HttpResponse response, TrafficCop trafficCop) public ClientResponse<InputStream> handleResponse(HttpResponse response, TrafficCop trafficCop)
{ {
ChannelBufferInputStream channelStream = null; try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(response.getContent())) {
try {
channelStream = new ChannelBufferInputStream(response.getContent());
queue.put(channelStream); queue.put(channelStream);
} }
catch (IOException e) {
throw new RuntimeException(e);
}
catch (InterruptedException e) { catch (InterruptedException e) {
log.error(e, "Queue appending interrupted"); log.error(e, "Queue appending interrupted");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException(e); throw new RuntimeException(e);
} }
finally {
CloseQuietly.close(channelStream);
}
byteCount.addAndGet(response.getContent().readableBytes()); byteCount.addAndGet(response.getContent().readableBytes());
return ClientResponse.finished( return ClientResponse.finished(
new SequenceInputStream( new SequenceInputStream(
@ -112,21 +109,19 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler<I
final ChannelBuffer channelBuffer = chunk.getContent(); final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes(); final int bytes = channelBuffer.readableBytes();
if (bytes > 0) { if (bytes > 0) {
ChannelBufferInputStream channelStream = null; try (ChannelBufferInputStream channelStream = new ChannelBufferInputStream(channelBuffer)) {
try {
channelStream = new ChannelBufferInputStream(channelBuffer);
queue.put(channelStream); queue.put(channelStream);
// Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong
log.debug("Added stream. Queue length %d", queue.size()); log.debug("Added stream. Queue length %d", queue.size());
} }
catch (IOException e) {
throw new RuntimeException(e);
}
catch (InterruptedException e) { catch (InterruptedException e) {
log.warn(e, "Thread interrupted while adding to queue"); log.warn(e, "Thread interrupted while adding to queue");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException(e); throw new RuntimeException(e);
} }
finally {
CloseQuietly.close(channelStream);
}
byteCount.addAndGet(bytes); byteCount.addAndGet(bytes);
} else { } else {
log.debug("Skipping zero length chunk"); log.debug("Skipping zero length chunk");

View File

@ -19,11 +19,19 @@
package org.apache.druid.utils; package org.apache.druid.utils;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.io.Closer;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
/** /**
* Methods in this class could have belonged to {@link org.apache.druid.java.util.common.io.Closer}, but not editing * Methods in this class could have belonged to {@link Closer}, but not editing
* that class to keep its source close to Guava source. * that class to keep its source close to Guava source.
*/ */
public final class CloseableUtils public final class CloseableUtils
@ -34,15 +42,127 @@ public final class CloseableUtils
* first.close(); * first.close();
* second.close(); * second.close();
* *
* to have safety of {@link org.apache.druid.java.util.common.io.Closer}, but without associated boilerplate code * to have safety of {@link Closer}, but without associated boilerplate code
* of creating a Closer and registering objects in it. * of creating a Closer and registering objects in it.
*/ */
public static void closeBoth(Closeable first, Closeable second) throws IOException public static void closeAll(Closeable first, Closeable... others) throws IOException
{ {
//noinspection EmptyTryBlock final List<Closeable> closeables = new ArrayList<>(others.length + 1);
try (Closeable ignore1 = second; closeables.add(first);
Closeable ignore2 = first) { closeables.addAll(Arrays.asList(others));
// piggy-back try-with-resources semantics closeAll(closeables);
}
/**
* Close all the provided {@param closeables}, from first to last.
*/
public static <T extends Closeable> void closeAll(Iterable<T> closeables) throws IOException
{
final Closer closer = Closer.create();
// Register in reverse order, so we close from first to last.
closer.registerAll(Lists.reverse(Lists.newArrayList(closeables)));
closer.close();
}
/**
* Like {@link Closeable#close()}, but guaranteed to throw {@param caught}. Will add any exceptions encountered
* during closing to {@param caught} using {@link Throwable#addSuppressed(Throwable)}.
*
* Should be used like {@code throw CloseableUtils.closeInCatch(e, closeable)}. (The "throw" is important for
* reachability detection.)
*/
public static <E extends Throwable> RuntimeException closeInCatch(
final E caught,
@Nullable final Closeable closeable
) throws E
{
if (caught == null) {
// Incorrect usage; throw an exception with an error message that may be useful to the programmer.
final RuntimeException e1 = new IllegalStateException("Must be called with non-null caught exception");
if (closeable != null) {
try {
closeable.close();
}
catch (Throwable e2) {
e1.addSuppressed(e2);
}
}
throw e1;
}
if (closeable != null) {
try {
closeable.close();
}
catch (Throwable e) {
caught.addSuppressed(e);
}
}
throw caught;
}
/**
* Like {@link #closeInCatch} but wraps {@param caught} in a {@link RuntimeException} if it is a checked exception.
*/
public static <E extends Throwable> RuntimeException closeAndWrapInCatch(
final E caught,
@Nullable final Closeable closeable
)
{
try {
throw closeInCatch(caught, closeable);
}
catch (RuntimeException | Error e) {
// Unchecked exception.
throw e;
}
catch (Throwable e) {
// Checked exception; must wrap.
throw new RuntimeException(e);
}
}
/**
* Like {@link Closeable#close()} but wraps IOExceptions in RuntimeExceptions.
*/
public static void closeAndWrapExceptions(@Nullable final Closeable closeable)
{
if (closeable == null) {
return;
}
try {
closeable.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Like {@link Closeable#close()} but sends any exceptions to the provided Consumer and then returns quietly.
*
* If the Consumer throws an exception, that exception is thrown by this method. So if your intent is to chomp
* exceptions, you should avoid writing a Consumer that might throw an exception.
*/
public static void closeAndSuppressExceptions(
@Nullable final Closeable closeable,
final Consumer<Throwable> chomper
)
{
if (closeable == null) {
return;
}
try {
closeable.close();
}
catch (Throwable e) {
chomper.accept(e);
} }
} }

View File

@ -0,0 +1,465 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import com.google.common.base.Throwables;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public class CloseableUtilsTest
{
private final TestCloseable quietCloseable = new TestCloseable(null);
private final TestCloseable quietCloseable2 = new TestCloseable(null);
private final TestCloseable ioExceptionCloseable = new TestCloseable(new IOException());
private final TestCloseable runtimeExceptionCloseable = new TestCloseable(new IllegalArgumentException());
private final TestCloseable assertionErrorCloseable = new TestCloseable(new AssertionError());
// For closeAndSuppressException tests.
private final AtomicLong chomped = new AtomicLong();
private final Consumer<Throwable> chomper = e -> chomped.incrementAndGet();
@Test
public void test_closeAll_array_quiet() throws IOException
{
CloseableUtils.closeAll(quietCloseable, null, quietCloseable2);
assertClosed(quietCloseable, quietCloseable2);
}
@Test
public void test_closeAll_list_quiet() throws IOException
{
CloseableUtils.closeAll(Arrays.asList(quietCloseable, null, quietCloseable2));
assertClosed(quietCloseable, quietCloseable2);
}
@Test
public void test_closeAll_array_loud()
{
Exception e = null;
try {
CloseableUtils.closeAll(quietCloseable, null, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable);
}
catch (Exception e2) {
e = e2;
}
assertClosed(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable);
// First exception
Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class));
// Second exception
Assert.assertEquals(1, e.getSuppressed().length);
Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class));
}
@Test
public void test_closeAll_list_loud()
{
Exception e = null;
try {
CloseableUtils.closeAll(
Arrays.asList(
quietCloseable,
null,
ioExceptionCloseable,
quietCloseable2,
runtimeExceptionCloseable
)
);
}
catch (Exception e2) {
e = e2;
}
assertClosed(quietCloseable, ioExceptionCloseable, quietCloseable2, runtimeExceptionCloseable);
// First exception
Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class));
// Second exception
Assert.assertEquals(1, e.getSuppressed().length);
Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class));
}
@Test
public void test_closeAndWrapExceptions_null()
{
CloseableUtils.closeAndWrapExceptions(null);
// Nothing happens.
}
@Test
public void test_closeAndWrapExceptions_quiet()
{
CloseableUtils.closeAndWrapExceptions(quietCloseable);
assertClosed(quietCloseable);
}
@Test
public void test_closeAndWrapExceptions_ioException()
{
Exception e = null;
try {
CloseableUtils.closeAndWrapExceptions(ioExceptionCloseable);
}
catch (Exception e1) {
e = e1;
}
assertClosed(ioExceptionCloseable);
Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class));
}
@Test
public void test_closeAndWrapExceptions_runtimeException()
{
Exception e = null;
try {
CloseableUtils.closeAndWrapExceptions(runtimeExceptionCloseable);
}
catch (Exception e1) {
e = e1;
}
assertClosed(runtimeExceptionCloseable);
Assert.assertThat(e, CoreMatchers.instanceOf(IllegalArgumentException.class));
}
@Test
public void test_closeAndWrapExceptions_assertionError()
{
Throwable e = null;
try {
CloseableUtils.closeAndWrapExceptions(assertionErrorCloseable);
}
catch (Throwable e1) {
e = e1;
}
assertClosed(assertionErrorCloseable);
Assert.assertThat(e, CoreMatchers.instanceOf(AssertionError.class));
}
@Test
public void test_closeAndSuppressExceptions_null()
{
CloseableUtils.closeAndSuppressExceptions(null, chomper);
Assert.assertEquals(0, chomped.get());
}
@Test
public void test_closeAndSuppressExceptions_quiet()
{
CloseableUtils.closeAndSuppressExceptions(quietCloseable, chomper);
assertClosed(quietCloseable);
Assert.assertEquals(0, chomped.get());
}
@Test
public void test_closeAndSuppressExceptions_ioException()
{
CloseableUtils.closeAndSuppressExceptions(ioExceptionCloseable, chomper);
assertClosed(ioExceptionCloseable);
Assert.assertEquals(1, chomped.get());
}
@Test
public void test_closeAndSuppressExceptions_runtimeException()
{
CloseableUtils.closeAndSuppressExceptions(runtimeExceptionCloseable, chomper);
assertClosed(runtimeExceptionCloseable);
Assert.assertEquals(1, chomped.get());
}
@Test
public void test_closeAndSuppressExceptions_assertionError()
{
CloseableUtils.closeAndSuppressExceptions(assertionErrorCloseable, chomper);
assertClosed(assertionErrorCloseable);
Assert.assertEquals(1, chomped.get());
}
@Test
public void test_closeInCatch_improper()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeInCatch(null, quietCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(quietCloseable.isClosed());
Assert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Must be called with non-null caught exception"))
);
}
@Test
public void test_closeInCatch_quiet()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), quietCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(quietCloseable.isClosed());
Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught"))
);
}
@Test
public void test_closeInCatch_ioException()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeInCatch(new IOException("this one was caught"), ioExceptionCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(ioExceptionCloseable.isClosed());
// First exception
Assert.assertThat(e, CoreMatchers.instanceOf(IOException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught"))
);
// Second exception
Assert.assertEquals(1, e.getSuppressed().length);
Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IOException.class));
}
@Test
public void test_closeInCatch_runtimeException()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeInCatch(new RuntimeException("this one was caught"), runtimeExceptionCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(runtimeExceptionCloseable.isClosed());
// First exception
Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught"))
);
// Second exception
Assert.assertEquals(1, e.getSuppressed().length);
Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class));
}
@Test
public void test_closeAndWrapInCatch_improper()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeAndWrapInCatch(null, quietCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(quietCloseable.isClosed());
Assert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Must be called with non-null caught exception"))
);
}
@Test
public void test_closeAndWrapInCatch_quiet()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), quietCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(quietCloseable.isClosed());
Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught"))
);
}
@Test
public void test_closeAndWrapInCatch_ioException()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeAndWrapInCatch(new IOException("this one was caught"), ioExceptionCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(ioExceptionCloseable.isClosed());
// First exception
Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("java.io.IOException: this one was caught"))
);
Assert.assertThat(e, ThrowableCauseMatcher.hasCause(CoreMatchers.instanceOf(IOException.class)));
Assert.assertThat(
e,
ThrowableCauseMatcher.hasCause(
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught"))
)
);
// Second exception
Assert.assertEquals(1, e.getCause().getSuppressed().length);
Assert.assertThat(e.getCause().getSuppressed()[0], CoreMatchers.instanceOf(IOException.class));
}
@Test
public void test_closeAndWrapInCatch_runtimeException()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), runtimeExceptionCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(runtimeExceptionCloseable.isClosed());
// First exception
Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught"))
);
// Second exception
Assert.assertEquals(1, e.getSuppressed().length);
Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(IllegalArgumentException.class));
}
@Test
public void test_closeAndWrapInCatch_assertionError()
{
Exception e = null;
try {
//noinspection ThrowableNotThrown
CloseableUtils.closeAndWrapInCatch(new RuntimeException("this one was caught"), assertionErrorCloseable);
}
catch (Exception e1) {
e = e1;
}
Assert.assertTrue(assertionErrorCloseable.isClosed());
// First exception
Assert.assertThat(e, CoreMatchers.instanceOf(RuntimeException.class));
Assert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("this one was caught"))
);
// Second exception
Assert.assertEquals(1, e.getSuppressed().length);
Assert.assertThat(e.getSuppressed()[0], CoreMatchers.instanceOf(AssertionError.class));
}
private static void assertClosed(final TestCloseable... closeables)
{
for (TestCloseable closeable : closeables) {
Assert.assertTrue(closeable.isClosed());
}
}
private static class TestCloseable implements Closeable
{
@Nullable
private final Throwable e;
private final AtomicBoolean closed = new AtomicBoolean(false);
TestCloseable(@Nullable Throwable e)
{
this.e = e;
}
@Override
public void close() throws IOException
{
closed.set(true);
if (e != null) {
Throwables.propagateIfInstanceOf(e, IOException.class);
throw Throwables.propagate(e);
}
}
public boolean isClosed()
{
return closed.get();
}
}
}

View File

@ -25,9 +25,9 @@ import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -45,7 +45,13 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector
@SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "incremented but in single thread") @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "incremented but in single thread")
private volatile int term = 0; private volatile int term = 0;
public K8sDruidLeaderSelector(@Self DruidNode self, String lockResourceName, String lockResourceNamespace, K8sDiscoveryConfig discoveryConfig, K8sLeaderElectorFactory k8sLeaderElectorFactory) public K8sDruidLeaderSelector(
@Self DruidNode self,
String lockResourceName,
String lockResourceNamespace,
K8sDiscoveryConfig discoveryConfig,
K8sLeaderElectorFactory k8sLeaderElectorFactory
)
{ {
this.leaderLatch = new LeaderElectorAsyncWrapper( this.leaderLatch = new LeaderElectorAsyncWrapper(
self.getServiceScheme() + "://" + self.getHostAndPortToUse(), self.getServiceScheme() + "://" + self.getHostAndPortToUse(),
@ -72,8 +78,7 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector
} }
catch (Throwable ex) { catch (Throwable ex) {
LOGGER.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); LOGGER.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit();
closeLeaderLatchQuietly();
CloseQuietly.close(leaderLatch);
leader = false; leader = false;
//Exit and Kubernetes would simply create a new replacement pod. //Exit and Kubernetes would simply create a new replacement pod.
System.exit(1); System.exit(1);
@ -147,6 +152,15 @@ public class K8sDruidLeaderSelector implements DruidLeaderSelector
if (!lifecycleLock.canStop()) { if (!lifecycleLock.canStop()) {
throw new ISE("can't stop."); throw new ISE("can't stop.");
} }
CloseQuietly.close(leaderLatch);
closeLeaderLatchQuietly();
}
private void closeLeaderLatchQuietly()
{
CloseableUtils.closeAndSuppressExceptions(
leaderLatch,
e -> LOGGER.warn("Exception caught while cleaning up leader latch")
);
} }
} }

View File

@ -32,11 +32,11 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.utils.CloseableUtils;
import java.io.Closeable; import java.io.Closeable;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -340,7 +340,8 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
try { try {
LOGGER.info("Stopping NodeRoleWatcher for [%s]...", nodeRole); LOGGER.info("Stopping NodeRoleWatcher for [%s]...", nodeRole);
CloseQuietly.close(watchRef.getAndSet(STOP_MARKER)); // STOP_MARKER cannot throw exceptions on close(), so this is OK.
CloseableUtils.closeAndSuppressExceptions(STOP_MARKER, e -> {});
watchExecutor.shutdownNow(); watchExecutor.shutdownNow();
if (!watchExecutor.awaitTermination(15, TimeUnit.SECONDS)) { if (!watchExecutor.awaitTermination(15, TimeUnit.SECONDS)) {

View File

@ -61,7 +61,6 @@ import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.NoopQueryRunner;
@ -86,6 +85,7 @@ import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.Committers; import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.utils.CloseableUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -428,9 +428,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
finally { finally {
toolbox.getChatHandlerProvider().unregister(getId()); toolbox.getChatHandlerProvider().unregister(getId());
CloseQuietly.close(firehose); CloseableUtils.closeAndSuppressExceptions(firehose, e -> log.warn("Failed to close Firehose"));
appenderator.close(); appenderator.close();
CloseQuietly.close(driver); CloseableUtils.closeAndSuppressExceptions(driver, e -> log.warn("Failed to close AppenderatorDriver"));
toolbox.removeMonitor(metricsMonitor); toolbox.removeMonitor(metricsMonitor);

View File

@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
@ -67,6 +66,7 @@ import org.apache.druid.segment.realtime.plumber.RealtimePlumberSchool;
import org.apache.druid.segment.realtime.plumber.VersioningPolicy; import org.apache.druid.segment.realtime.plumber.VersioningPolicy;
import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -302,7 +302,11 @@ public class RealtimeIndexTask extends AbstractTask
{ {
try { try {
// Side effect: Calling getVersion causes a lock to be acquired // Side effect: Calling getVersion causes a lock to be acquired
final TimeChunkLockAcquireAction action = new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs); final TimeChunkLockAcquireAction action = new TimeChunkLockAcquireAction(
TaskLockType.EXCLUSIVE,
interval,
lockTimeoutMs
);
final TaskLock lock = Preconditions.checkNotNull( final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(action), toolbox.getTaskActionClient().submit(action),
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
@ -471,7 +475,7 @@ public class RealtimeIndexTask extends AbstractTask
} }
finally { finally {
if (firehose != null) { if (firehose != null) {
CloseQuietly.close(firehose); CloseableUtils.closeAndSuppressExceptions(firehose, e -> log.warn("Failed to close Firehose"));
} }
toolbox.removeMonitor(metricsMonitor); toolbox.removeMonitor(metricsMonitor);
} }

View File

@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.DimFilter;
@ -58,6 +57,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.utils.CloseableUtils;
import org.apache.druid.utils.CollectionUtils; import org.apache.druid.utils.CollectionUtils;
import java.io.File; import java.io.File;
@ -208,10 +208,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Closer closer = Closer.create(); CloseableUtils.closeAll(rowYielder, segmentFile);
closer.register(rowYielder);
closer.register(segmentFile);
closer.close();
} }
}; };
} }

View File

@ -61,7 +61,6 @@ import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
@ -70,6 +69,7 @@ import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.utils.CloseableUtils;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
@ -123,8 +123,8 @@ public class OverlordTest
private void tearDownServerAndCurator() private void tearDownServerAndCurator()
{ {
CloseQuietly.close(curator); CloseableUtils.closeAndWrapExceptions(curator);
CloseQuietly.close(server); CloseableUtils.closeAndWrapExceptions(server);
} }
@Before @Before

View File

@ -21,13 +21,12 @@ package org.apache.druid.guice;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -56,15 +55,7 @@ public class PropertiesModule implements Module
props.putAll(systemProps); props.putAll(systemProps);
for (String propertiesFile : propertiesFiles) { for (String propertiesFile : propertiesFiles) {
InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile); try (InputStream stream = openPropertiesFile(propertiesFile, systemProps)) {
try {
if (stream == null) {
File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
if (workingDirectoryFile.exists()) {
stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
}
}
if (stream != null) { if (stream != null) {
log.debug("Loading properties from %s", propertiesFile); log.debug("Loading properties from %s", propertiesFile);
try (final InputStreamReader in = new InputStreamReader(stream, StandardCharsets.UTF_8)) { try (final InputStreamReader in = new InputStreamReader(stream, StandardCharsets.UTF_8)) {
@ -75,14 +66,29 @@ public class PropertiesModule implements Module
} }
} }
} }
catch (FileNotFoundException e) { catch (IOException e) {
log.error(e, "This can only happen if the .exists() call lied."); throw new RuntimeException(e);
}
finally {
CloseQuietly.close(stream);
} }
} }
binder.bind(Properties.class).toInstance(props); binder.bind(Properties.class).toInstance(props);
} }
@Nullable
private static InputStream openPropertiesFile(final String propertiesFile, final Properties systemProps)
throws IOException
{
final InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
if (stream != null) {
return stream;
} else {
File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
if (workingDirectoryFile.exists()) {
return new BufferedInputStream(new FileInputStream(workingDirectoryFile));
} else {
return null;
}
}
}
} }

View File

@ -33,7 +33,6 @@ import org.apache.druid.guice.annotations.Global;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.FunctionalIterator; import org.apache.druid.java.util.common.guava.FunctionalIterator;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
@ -50,11 +49,11 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.filter.Filters;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -66,6 +65,7 @@ import java.util.NoSuchElementException;
import java.util.TreeMap; import java.util.TreeMap;
/** /**
*
*/ */
public class GroupByQueryEngine public class GroupByQueryEngine
{ {
@ -131,21 +131,14 @@ public class GroupByQueryEngine
@Override @Override
public void cleanup(RowIterator iterFromMake) public void cleanup(RowIterator iterFromMake)
{ {
CloseQuietly.close(iterFromMake); CloseableUtils.closeAndWrapExceptions(iterFromMake);
} }
} }
); );
} }
} }
), ),
new Closeable() bufferHolder
{
@Override
public void close()
{
CloseQuietly.close(bufferHolder);
}
}
) )
); );
} }

View File

@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils; import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
@ -72,6 +71,7 @@ import org.apache.druid.query.groupby.resource.GroupByQueryResource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.utils.CloseableUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -455,9 +455,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
finalResultSupplier finalResultSupplier
); );
} }
catch (Exception ex) { catch (Throwable e) {
CloseQuietly.close(resultSupplier); throw CloseableUtils.closeAndWrapInCatch(e, resultSupplier);
throw ex;
} }
} }
@ -591,9 +590,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
resultSupplierOne //this will close resources allocated by resultSupplierOne after sequence read resultSupplierOne //this will close resources allocated by resultSupplierOne after sequence read
); );
} }
catch (Exception ex) { catch (Throwable e) {
CloseQuietly.close(resultSupplierOne); throw CloseableUtils.closeAndWrapInCatch(e, resultSupplierOne);
throw ex;
} }
} }
@ -613,16 +611,17 @@ public class GroupByStrategyV2 implements GroupByStrategy
new LazySequence<>( new LazySequence<>(
() -> Sequences.withBaggage( () -> Sequences.withBaggage(
memoizedSupplier.get().results(dimsToInclude), memoizedSupplier.get().results(dimsToInclude),
closeOnSequenceRead ? () -> CloseQuietly.close(memoizedSupplier.get()) : () -> {} closeOnSequenceRead
? () -> CloseableUtils.closeAndWrapExceptions(memoizedSupplier.get())
: () -> {}
) )
), ),
subtotalQuery, subtotalQuery,
null null
); );
} }
catch (Exception ex) { catch (Throwable e) {
CloseQuietly.close(baseResultsSupplier.get()); throw CloseableUtils.closeAndWrapInCatch(e, baseResultsSupplier.get());
throw ex;
} }
} }

View File

@ -27,7 +27,6 @@ import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.GenericQueryMetricsFactory;
@ -39,6 +38,7 @@ import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.utils.CloseableUtils;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -108,7 +108,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
@Override @Override
public void cleanup(ScanQueryLimitRowIterator iterFromMake) public void cleanup(ScanQueryLimitRowIterator iterFromMake)
{ {
CloseQuietly.close(iterFromMake); CloseableUtils.closeAndWrapExceptions(iterFromMake);
} }
}); });
} }

View File

@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.query.BaseQuery; import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
@ -43,6 +42,7 @@ import org.apache.druid.segment.historical.HistoricalColumnSelector;
import org.apache.druid.segment.historical.HistoricalCursor; import org.apache.druid.segment.historical.HistoricalCursor;
import org.apache.druid.segment.historical.HistoricalDimensionSelector; import org.apache.druid.segment.historical.HistoricalDimensionSelector;
import org.apache.druid.segment.historical.SingleValueHistoricalDimensionSelector; import org.apache.druid.segment.historical.SingleValueHistoricalDimensionSelector;
import org.apache.druid.utils.CloseableUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -784,7 +784,7 @@ public class PooledTopNAlgorithm
if (resultsBufHolder != null) { if (resultsBufHolder != null) {
resultsBufHolder.get().clear(); resultsBufHolder.get().clear();
} }
CloseQuietly.close(resultsBufHolder); CloseableUtils.closeAndWrapExceptions(resultsBufHolder);
} }
} }

View File

@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.BaseColumn; import org.apache.druid.segment.column.BaseColumn;
@ -38,6 +37,7 @@ import org.apache.druid.segment.data.ImmutableBitmapValues;
import org.apache.druid.segment.data.IndexedIterable; import org.apache.druid.segment.data.IndexedIterable;
import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector; import org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -51,6 +51,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
*
*/ */
public class QueryableIndexIndexableAdapter implements IndexableAdapter public class QueryableIndexIndexableAdapter implements IndexableAdapter
{ {
@ -279,7 +280,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
@Override @Override
public void close() public void close()
{ {
CloseQuietly.close(closer); CloseableUtils.closeAndWrapExceptions(closer);
} }
@Override @Override

View File

@ -21,7 +21,6 @@ package org.apache.druid.segment.column;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@ -43,6 +42,7 @@ import org.apache.druid.segment.vector.ReadableVectorInspector;
import org.apache.druid.segment.vector.ReadableVectorOffset; import org.apache.druid.segment.vector.ReadableVectorOffset;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
@ -607,13 +607,6 @@ public class StringDictionaryEncodedColumn implements DictionaryEncodedColumn<St
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
CloseQuietly.close(cachedDictionary); CloseableUtils.closeAll(cachedDictionary, column, multiValueColumn);
if (column != null) {
column.close();
}
if (multiValueColumn != null) {
multiValueColumn.close();
}
} }
} }

View File

@ -21,7 +21,6 @@ package org.apache.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -168,7 +167,9 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier<ColumnarDoub
protected void loadBuffer(int bufferNum) protected void loadBuffer(int bufferNum)
{ {
CloseQuietly.close(holder); if (holder != null) {
holder.close();
}
holder = singleThreadedDoubleBuffers.get(bufferNum); holder = singleThreadedDoubleBuffers.get(bufferNum);
// asDoubleBuffer() makes the doubleBuffer's position = 0 // asDoubleBuffer() makes the doubleBuffer's position = 0
doubleBuffer = holder.get().asDoubleBuffer(); doubleBuffer = holder.get().asDoubleBuffer();

View File

@ -21,7 +21,6 @@ package org.apache.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -168,7 +167,9 @@ public class BlockLayoutColumnarFloatsSupplier implements Supplier<ColumnarFloat
protected void loadBuffer(int bufferNum) protected void loadBuffer(int bufferNum)
{ {
CloseQuietly.close(holder); if (holder != null) {
holder.close();
}
holder = singleThreadedFloatBuffers.get(bufferNum); holder = singleThreadedFloatBuffers.get(bufferNum);
// asFloatBuffer() makes the floatBuffer's position = 0 // asFloatBuffer() makes the floatBuffer's position = 0
floatBuffer = holder.get().asFloatBuffer(); floatBuffer = holder.get().asFloatBuffer();

View File

@ -21,7 +21,6 @@ package org.apache.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -83,7 +82,9 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
@Override @Override
protected void loadBuffer(int bufferNum) protected void loadBuffer(int bufferNum)
{ {
CloseQuietly.close(holder); if (holder != null) {
holder.close();
}
holder = singleThreadedLongBuffers.get(bufferNum); holder = singleThreadedLongBuffers.get(bufferNum);
buffer = holder.get(); buffer = holder.get();
// asLongBuffer() makes the longBuffer's position = 0 // asLongBuffer() makes the longBuffer's position = 0
@ -190,7 +191,9 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
protected void loadBuffer(int bufferNum) protected void loadBuffer(int bufferNum)
{ {
CloseQuietly.close(holder); if (holder != null) {
holder.close();
}
holder = singleThreadedLongBuffers.get(bufferNum); holder = singleThreadedLongBuffers.get(bufferNum);
buffer = holder.get(); buffer = holder.get();
currBufferNum = bufferNum; currBufferNum = bufferNum;

View File

@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@ -134,7 +133,11 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier<Columnar
throw new IAE("Unknown version[%s]", versionFromBuffer); throw new IAE("Unknown version[%s]", versionFromBuffer);
} }
public static CompressedColumnarIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper) public static CompressedColumnarIntsSupplier fromByteBuffer(
ByteBuffer buffer,
ByteOrder order,
SmooshedFileMapper mapper
)
{ {
byte versionFromBuffer = buffer.get(); byte versionFromBuffer = buffer.get();
@ -292,7 +295,9 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier<Columnar
int currBufferNum = -1; int currBufferNum = -1;
ResourceHolder<ByteBuffer> holder; ResourceHolder<ByteBuffer> holder;
/** buffer's position must be 0 */ /**
* buffer's position must be 0
*/
IntBuffer buffer; IntBuffer buffer;
@Override @Override
@ -317,7 +322,9 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier<Columnar
protected void loadBuffer(int bufferNum) protected void loadBuffer(int bufferNum)
{ {
CloseQuietly.close(holder); if (holder != null) {
holder.close();
}
holder = singleThreadedIntBuffers.get(bufferNum); holder = singleThreadedIntBuffers.get(bufferNum);
// asIntBuffer() makes the buffer's position = 0 // asIntBuffer() makes the buffer's position = 0
buffer = holder.get().asIntBuffer(); buffer = holder.get().asIntBuffer();

View File

@ -25,7 +25,6 @@ import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.utils.ByteUtils; import org.apache.druid.common.utils.ByteUtils;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@ -433,7 +432,9 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier<Col
protected void loadBuffer(int bufferNum) protected void loadBuffer(int bufferNum)
{ {
CloseQuietly.close(holder); if (holder != null) {
holder.close();
}
holder = singleThreadedBuffers.get(bufferNum); holder = singleThreadedBuffers.get(bufferNum);
ByteBuffer bb = holder.get(); ByteBuffer bb = holder.get();
ByteOrder byteOrder = bb.order(); ByteOrder byteOrder = bb.order();

View File

@ -26,7 +26,6 @@ import org.apache.druid.common.utils.SerializerUtils;
import org.apache.druid.io.Channels; import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
@ -35,6 +34,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.serde.Serializer; import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes; import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
@ -556,13 +556,13 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
headerOut.writeInt(Ints.checkedCast(valuesOut.size())); headerOut.writeInt(Ints.checkedCast(valuesOut.size()));
if (prevVal instanceof Closeable) { if (prevVal instanceof Closeable) {
CloseQuietly.close((Closeable) prevVal); CloseableUtils.closeAndWrapExceptions((Closeable) prevVal);
} }
prevVal = next; prevVal = next;
} while (objects.hasNext()); } while (objects.hasNext());
if (prevVal instanceof Closeable) { if (prevVal instanceof Closeable) {
CloseQuietly.close((Closeable) prevVal); CloseableUtils.closeAndWrapExceptions((Closeable) prevVal);
} }
} }
catch (IOException e) { catch (IOException e) {

View File

@ -20,14 +20,15 @@
package org.apache.druid.segment.join; package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.Filter; import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis; import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -43,6 +44,8 @@ import java.util.Optional;
*/ */
public class HashJoinSegment implements SegmentReference public class HashJoinSegment implements SegmentReference
{ {
private static final Logger log = new Logger(HashJoinSegment.class);
private final SegmentReference baseSegment; private final SegmentReference baseSegment;
private final Filter baseFilter; private final Filter baseFilter;
private final List<JoinableClause> clauses; private final List<JoinableClause> clauses;
@ -131,14 +134,16 @@ public class HashJoinSegment implements SegmentReference
}).orElse(true); }).orElse(true);
} }
if (acquireFailed) { if (acquireFailed) {
CloseQuietly.close(closer); CloseableUtils.closeAndWrapExceptions(closer);
return Optional.empty(); return Optional.empty();
} else { } else {
return Optional.of(closer); return Optional.of(closer);
} }
} }
catch (Exception ex) { catch (Throwable e) {
CloseQuietly.close(closer); // acquireReferences is not permitted to throw exceptions.
CloseableUtils.closeAndSuppressExceptions(closer, e::addSuppressed);
log.warn(e, "Exception encountered while trying to acquire reference");
return Optional.empty(); return Optional.empty();
} }
} }

View File

@ -41,7 +41,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielder;
@ -85,6 +84,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CloseableUtils;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.Closeable; import java.io.Closeable;
@ -712,7 +712,7 @@ public class AggregationTestHelper implements Closeable
} }
finally { finally {
for (Segment segment : segments) { for (Segment segment : segments) {
CloseQuietly.close(segment); CloseableUtils.closeAndWrapExceptions(segment);
} }
} }
} }

View File

@ -28,7 +28,6 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row; import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.NoopInputRowParser; import org.apache.druid.data.input.impl.NoopInputRowParser;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.Result; import org.apache.druid.query.Result;
@ -41,6 +40,7 @@ import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.junit.After; import org.junit.After;
@ -133,7 +133,7 @@ public class StringColumnAggregationTest
{ {
if (segments != null) { if (segments != null) {
for (Segment seg : segments) { for (Segment seg : segments) {
CloseQuietly.close(seg); CloseableUtils.closeAndWrapExceptions(seg);
} }
} }
} }

View File

@ -26,7 +26,6 @@ import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.Smoosh;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@ -35,6 +34,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
import org.apache.druid.utils.CloseableUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -226,7 +226,7 @@ public class CompressedColumnarIntsSerializerTest
for (int i = 0; i < vals.length; ++i) { for (int i = 0; i < vals.length; ++i) {
Assert.assertEquals(vals[i], columnarInts.get(i)); Assert.assertEquals(vals[i], columnarInts.get(i));
} }
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
} }
private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception
@ -269,7 +269,7 @@ public class CompressedColumnarIntsSerializerTest
for (int i = 0; i < vals.length; ++i) { for (int i = 0; i < vals.length; ++i) {
Assert.assertEquals(vals[i], columnarInts.get(i)); Assert.assertEquals(vals[i], columnarInts.get(i));
} }
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
mapper.close(); mapper.close();
} }
} }

View File

@ -22,9 +22,9 @@ package org.apache.druid.segment.data;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.CompressedPools;
import org.apache.druid.utils.CloseableUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -58,7 +58,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest
public void setUp() public void setUp()
{ {
closer = Closer.create(); closer = Closer.create();
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
columnarInts = null; columnarInts = null;
supplier = null; supplier = null;
vals = null; vals = null;
@ -68,12 +68,12 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest
public void tearDown() throws Exception public void tearDown() throws Exception
{ {
closer.close(); closer.close();
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
} }
private void setupSimple(final int chunkSize) private void setupSimple(final int chunkSize)
{ {
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
@ -97,7 +97,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest
private void makeWithSerde(final int chunkSize) throws IOException private void makeWithSerde(final int chunkSize) throws IOException
{ {
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
final CompressedColumnarIntsSupplier theSupplier = CompressedColumnarIntsSupplier.fromIntBuffer( final CompressedColumnarIntsSupplier theSupplier = CompressedColumnarIntsSupplier.fromIntBuffer(
@ -271,7 +271,7 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest
stopLatch.await(); stopLatch.await();
} }
finally { finally {
CloseQuietly.close(columnarInts2); CloseableUtils.closeAndWrapExceptions(columnarInts2);
} }
if (failureHappened.get()) { if (failureHappened.get()) {

View File

@ -23,10 +23,10 @@ import com.google.common.base.Supplier;
import com.google.common.primitives.Doubles; import com.google.common.primitives.Doubles;
import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.utils.CloseableUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
@ -321,7 +321,7 @@ public class CompressedDoublesSerdeTest
stopLatch.await(); stopLatch.await();
} }
finally { finally {
CloseQuietly.close(indexed2); CloseableUtils.closeAndWrapExceptions(indexed2);
} }
if (failureHappened.get()) { if (failureHappened.get()) {

View File

@ -23,10 +23,10 @@ import com.google.common.base.Supplier;
import com.google.common.primitives.Floats; import com.google.common.primitives.Floats;
import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.utils.CloseableUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
@ -78,8 +78,29 @@ public class CompressedFloatsSerdeTest
private final float[] values2 = {13.2f, 6.1f, 0.001f, 123f, 12572f, 123.1f, 784.4f, 6892.8634f, 8.341111f}; private final float[] values2 = {13.2f, 6.1f, 0.001f, 123f, 12572f, 123.1f, 784.4f, 6892.8634f, 8.341111f};
private final float[] values3 = {0.001f, 0.001f, 0.001f, 0.001f, 0.001f, 100f, 100f, 100f, 100f, 100f}; private final float[] values3 = {0.001f, 0.001f, 0.001f, 0.001f, 0.001f, 100f, 100f, 100f, 100f, 100f};
private final float[] values4 = {0f, 0f, 0f, 0f, 0.01f, 0f, 0f, 0f, 21.22f, 0f, 0f, 0f, 0f, 0f, 0f}; private final float[] values4 = {0f, 0f, 0f, 0f, 0.01f, 0f, 0f, 0f, 21.22f, 0f, 0f, 0f, 0f, 0f, 0f};
private final float[] values5 = {123.16f, 1.12f, 62.00f, 462.12f, 517.71f, 56.54f, 971.32f, 824.22f, 472.12f, 625.26f}; private final float[] values5 = {
private final float[] values6 = {1000000f, 1000001f, 1000002f, 1000003f, 1000004f, 1000005f, 1000006f, 1000007f, 1000008f}; 123.16f,
1.12f,
62.00f,
462.12f,
517.71f,
56.54f,
971.32f,
824.22f,
472.12f,
625.26f
};
private final float[] values6 = {
1000000f,
1000001f,
1000002f,
1000003f,
1000004f,
1000005f,
1000006f,
1000007f,
1000008f
};
private final float[] values7 = { private final float[] values7 = {
Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, 12378.5734f, -12718243.7496f, -93653653.1f, 12743153.385534f, Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, 12378.5734f, -12718243.7496f, -93653653.1f, 12743153.385534f,
21431.414538f, 65487435436632.123f, -43734526234564.65f 21431.414538f, 65487435436632.123f, -43734526234564.65f
@ -321,7 +342,7 @@ public class CompressedFloatsSerdeTest
stopLatch.await(); stopLatch.await();
} }
finally { finally {
CloseQuietly.close(indexed2); CloseableUtils.closeAndWrapExceptions(indexed2);
} }
if (failureHappened.get()) { if (failureHappened.get()) {

View File

@ -23,10 +23,10 @@ import com.google.common.base.Supplier;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.utils.CloseableUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
@ -356,7 +356,7 @@ public class CompressedLongsSerdeTest
stopLatch.await(); stopLatch.await();
} }
finally { finally {
CloseQuietly.close(indexed2); CloseableUtils.closeAndWrapExceptions(indexed2);
} }
if (failureHappened.get()) { if (failureHappened.get()) {

View File

@ -25,7 +25,6 @@ import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.Smoosh;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@ -34,6 +33,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
import org.apache.druid.utils.CloseableUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -154,7 +154,7 @@ public class CompressedVSizeColumnarIntsSerializerTest
for (int i = 0; i < vals.length; ++i) { for (int i = 0; i < vals.length; ++i) {
Assert.assertEquals(vals[i], columnarInts.get(i)); Assert.assertEquals(vals[i], columnarInts.get(i));
} }
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
} }
@Test @Test
@ -268,8 +268,7 @@ public class CompressedVSizeColumnarIntsSerializerTest
for (int i = 0; i < vals.length; ++i) { for (int i = 0; i < vals.length; ++i) {
Assert.assertEquals(vals[i], columnarInts.get(i)); Assert.assertEquals(vals[i], columnarInts.get(i));
} }
CloseQuietly.close(columnarInts); CloseableUtils.closeAll(columnarInts, mapper);
mapper.close();
} }
@Test @Test

View File

@ -27,9 +27,9 @@ import com.google.common.primitives.Longs;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.CompressedPools;
import org.apache.druid.utils.CloseableUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -86,7 +86,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy
public void setUp() public void setUp()
{ {
closer = Closer.create(); closer = Closer.create();
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
columnarInts = null; columnarInts = null;
supplier = null; supplier = null;
vals = null; vals = null;
@ -95,13 +95,12 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy
@After @After
public void tearDown() throws Exception public void tearDown() throws Exception
{ {
CloseQuietly.close(columnarInts); CloseableUtils.closeAll(columnarInts, closer);
closer.close();
} }
private void setupSimple(final int chunkSize) private void setupSimple(final int chunkSize)
{ {
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
@ -126,7 +125,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy
private void makeWithSerde(final int chunkSize) throws IOException private void makeWithSerde(final int chunkSize) throws IOException
{ {
CloseQuietly.close(columnarInts); CloseableUtils.closeAndWrapExceptions(columnarInts);
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
final CompressedVSizeColumnarIntsSupplier theSupplier = CompressedVSizeColumnarIntsSupplier.fromList( final CompressedVSizeColumnarIntsSupplier theSupplier = CompressedVSizeColumnarIntsSupplier.fromList(
@ -212,8 +211,14 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy
public void testmaxIntsInBuffer() public void testmaxIntsInBuffer()
{ {
Assert.assertEquals(CompressedPools.BUFFER_SIZE, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(1)); Assert.assertEquals(CompressedPools.BUFFER_SIZE, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(1));
Assert.assertEquals(CompressedPools.BUFFER_SIZE / 2, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(2)); Assert.assertEquals(
Assert.assertEquals(CompressedPools.BUFFER_SIZE / 4, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(4)); CompressedPools.BUFFER_SIZE / 2,
CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(2)
);
Assert.assertEquals(
CompressedPools.BUFFER_SIZE / 4,
CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(4)
);
Assert.assertEquals(CompressedPools.BUFFER_SIZE, 0x10000); // nearest power of 2 is 2^14 Assert.assertEquals(CompressedPools.BUFFER_SIZE, 0x10000); // nearest power of 2 is 2^14
Assert.assertEquals(1 << 14, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(3)); Assert.assertEquals(1 << 14, CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForBytes(3));
@ -330,7 +335,7 @@ public class CompressedVSizeColumnarIntsSupplierTest extends CompressionStrategy
stopLatch.await(); stopLatch.await();
} }
finally { finally {
CloseQuietly.close(columnarInts2); CloseableUtils.closeAndWrapExceptions(columnarInts2);
} }
if (failureHappened.get()) { if (failureHappened.get()) {

View File

@ -25,7 +25,6 @@ import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.Smoosh; import org.apache.druid.java.util.common.io.smoosh.Smoosh;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@ -34,6 +33,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
import org.apache.druid.utils.CloseableUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
@ -322,12 +322,17 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
Assert.assertEquals(subVals.get(j), vals.get(i)[j]); Assert.assertEquals(subVals.get(j), vals.get(i)[j]);
} }
} }
CloseQuietly.close(columnarMultiInts); CloseableUtils.closeAll(columnarMultiInts, mapper);
mapper.close();
} }
} }
private void generateV2SerializedSizeAndData(long numRows, int maxValue, int maxValuesPerRow, int offsetChunkFactor, int valueChunkFactor) throws Exception private void generateV2SerializedSizeAndData(
long numRows,
int maxValue,
int maxValuesPerRow,
int offsetChunkFactor,
int valueChunkFactor
) throws Exception
{ {
File tmpDirectory = FileUtils.createTempDir(StringUtils.format( File tmpDirectory = FileUtils.createTempDir(StringUtils.format(
"CompressedVSizeIndexedV3WriterTest_%d_%d", "CompressedVSizeIndexedV3WriterTest_%d_%d",
@ -395,8 +400,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
Assert.assertEquals(subVals.get(j), expected[j]); Assert.assertEquals(subVals.get(j), expected[j]);
} }
} }
CloseQuietly.close(columnarMultiInts); CloseableUtils.closeAll(columnarMultiInts, mapper);
mapper.close();
} }
} }
} }

View File

@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -58,6 +57,7 @@ import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.server.QueryResource; import org.apache.druid.server.QueryResource;
import org.apache.druid.utils.CloseableUtils;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpChunk;
@ -518,7 +518,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override @Override
public void cleanup(JsonParserIterator<T> iterFromMake) public void cleanup(JsonParserIterator<T> iterFromMake)
{ {
CloseQuietly.close(iterFromMake); CloseableUtils.closeAndWrapExceptions(iterFromMake);
} }
} }
); );

View File

@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryCapacityExceededException; import org.apache.druid.query.QueryCapacityExceededException;
@ -35,6 +34,7 @@ import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
@ -96,7 +96,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
return false; return false;
} }
if (jp.getCurrentToken() == JsonToken.END_ARRAY) { if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp); CloseableUtils.closeAndWrapExceptions(jp);
return false; return false;
} }

View File

@ -156,7 +156,7 @@ public class HybridCache implements Cache
@LifecycleStop @LifecycleStop
public void close() throws IOException public void close() throws IOException
{ {
CloseableUtils.closeBoth(level1, level2); CloseableUtils.closeAll(level1, level2);
} }
@Override @Override

View File

@ -31,15 +31,15 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.cache.PathChildrenCacheFactory; import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
@ -135,12 +135,11 @@ public class Announcer
started = false; started = false;
Closer closer = Closer.create();
for (PathChildrenCache cache : listeners.values()) {
closer.register(cache);
}
try { try {
CloseQuietly.close(closer); CloseableUtils.closeAll(listeners.values());
}
catch (IOException e) {
throw new RuntimeException(e);
} }
finally { finally {
pathChildrenCacheExecutor.shutdown(); pathChildrenCacheExecutor.shutdown();
@ -413,9 +412,8 @@ public class Announcer
try { try {
cache.start(); cache.start();
} }
catch (Exception e) { catch (Throwable e) {
CloseQuietly.close(cache); throw CloseableUtils.closeAndWrapInCatch(e, cache);
throw new RuntimeException(e);
} }
} }

View File

@ -30,9 +30,9 @@ import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -40,6 +40,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
*
*/ */
public class CuratorDruidLeaderSelector implements DruidLeaderSelector public class CuratorDruidLeaderSelector implements DruidLeaderSelector
{ {
@ -100,8 +101,11 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit();
// give others a chance to become leader. // give others a chance to become leader.
final LeaderLatch oldLatch = createNewLeaderLatchWithListener(); CloseableUtils.closeAndSuppressExceptions(
CloseQuietly.close(oldLatch); createNewLeaderLatchWithListener(),
e -> log.warn("Could not close old leader latch; continuing with new one anyway.")
);
leader = false; leader = false;
try { try {
//Small delay before starting the latch so that others waiting are chosen to become leader. //Small delay before starting the latch so that others waiting are chosen to become leader.
@ -207,7 +211,8 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
if (!lifecycleLock.canStop()) { if (!lifecycleLock.canStop()) {
throw new ISE("can't stop."); throw new ISE("can't stop.");
} }
CloseQuietly.close(leaderLatch.get());
CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch."));
listenerExecutor.shutdownNow(); listenerExecutor.shutdownNow();
} }
} }

View File

@ -155,7 +155,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
closer.registerAll(nodeRoleWatchers.values()); closer.registerAll(nodeRoleWatchers.values());
closer.registerAll(nodeDiscoverers); closer.registerAll(nodeDiscoverers);
CloseableUtils.closeBoth(closer, listenerExecutor::shutdownNow); CloseableUtils.closeAll(closer, listenerExecutor::shutdownNow);
} }
private static class NodeRoleWatcher implements DruidNodeDiscovery, Closeable private static class NodeRoleWatcher implements DruidNodeDiscovery, Closeable
@ -213,7 +213,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
CloseableUtils.closeBoth(cache, cacheExecutor::shutdownNow); CloseableUtils.closeAll(cache, cacheExecutor::shutdownNow);
} }
@Override @Override

View File

@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -68,6 +67,7 @@ import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.Closeable; import java.io.Closeable;
@ -259,9 +259,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
); );
return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner);
} }
catch (RuntimeException e) { catch (Throwable e) {
CloseQuietly.close(segmentAndCloseable.rhs); throw CloseableUtils.closeAndWrapInCatch(e, segmentAndCloseable.rhs);
throw e;
} }
} }
) )

View File

@ -119,7 +119,7 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
{ {
if (!closed) { if (!closed) {
closed = true; closed = true;
CloseableUtils.closeBoth(firehose, shutdownExec::shutdownNow); CloseableUtils.closeAll(firehose, shutdownExec::shutdownNow);
} }
} }
} }

View File

@ -22,10 +22,11 @@ package org.apache.druid.server.log;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.RequestLogLine; import org.apache.druid.server.RequestLogLine;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.MutableDateTime; import org.joda.time.MutableDateTime;
@ -43,9 +44,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
/** /**
*
*/ */
public class FileRequestLogger implements RequestLogger public class FileRequestLogger implements RequestLogger
{ {
private static final Logger log = new Logger(FileRequestLogger.class);
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
private final File baseDir; private final File baseDir;
@ -93,7 +97,12 @@ public class FileRequestLogger implements RequestLogger
try { try {
synchronized (lock) { synchronized (lock) {
currentDay = currentDay.plusDays(1); currentDay = currentDay.plusDays(1);
CloseQuietly.close(fileWriter);
CloseableUtils.closeAndSuppressExceptions(
fileWriter,
e -> log.warn("Could not close log file for %s. Creating new log file anyway.", currentDay)
);
fileWriter = getFileWriter(); fileWriter = getFileWriter();
} }
} }
@ -124,7 +133,7 @@ public class FileRequestLogger implements RequestLogger
public void stop() public void stop()
{ {
synchronized (lock) { synchronized (lock) {
CloseQuietly.close(fileWriter); CloseableUtils.closeAndWrapExceptions(fileWriter);
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -82,10 +83,12 @@ public class HybridCacheTest
} }
@Test @Test
public void testSanity() public void testSanity() throws IOException
{ {
final MapCache l1 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); final ByteCountingLRUMap l1Map = new ByteCountingLRUMap(1024 * 1024);
final MapCache l2 = new MapCache(new ByteCountingLRUMap(1024 * 1024)); final ByteCountingLRUMap l2Map = new ByteCountingLRUMap(1024 * 1024);
final MapCache l1 = new MapCache(l1Map);
final MapCache l2 = new MapCache(l2Map);
HybridCache cache = new HybridCache(new HybridCacheConfig(), l1, l2); HybridCache cache = new HybridCache(new HybridCacheConfig(), l1, l2);
final Cache.NamedKey key1 = new Cache.NamedKey("a", HI); final Cache.NamedKey key1 = new Cache.NamedKey("a", HI);
@ -175,5 +178,10 @@ public class HybridCacheTest
Assert.assertEquals(hits + 1, cache.getStats().getNumHits()); Assert.assertEquals(hits + 1, cache.getStats().getNumHits());
Assert.assertEquals(misses + 1, cache.getStats().getNumMisses()); Assert.assertEquals(misses + 1, cache.getStats().getNumMisses());
} }
// test close
cache.close();
Assert.assertEquals("l1 size after close()", 0, l1Map.size());
Assert.assertEquals("l2 size after close()", 0, l2Map.size());
} }
} }