From 781873ba53a43db50e99e6f21180f748b3c65919 Mon Sep 17 00:00:00 2001 From: Benedict Jin Date: Tue, 20 Aug 2019 17:55:41 +0800 Subject: [PATCH] Fix resource leak (#8337) * Fix resource leak * Patch comments --- .../SequenceInputStreamResponseHandler.java | 15 ++++++- ...istBasedDruidToTimelineEventConverter.java | 11 +++-- .../cassandra/CassandraDataSegmentPusher.java | 7 +-- .../druid/storage/google/GoogleTaskLogs.java | 41 ++++++++--------- .../apache/druid/guice/PropertiesModule.java | 4 +- .../epinephelinae/SpillingGrouper.java | 6 ++- .../segment/data/GenericIndexedWriter.java | 10 ++--- .../druid/server/security/TLSUtils.java | 44 +++++++++++-------- 8 files changed, 81 insertions(+), 57 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java index 203a0d2aed6..c3247d69832 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java @@ -20,6 +20,7 @@ package org.apache.druid.java.util.http.client.response; import com.google.common.io.ByteSource; +import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; @@ -56,14 +57,19 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response, TrafficCop trafficCop) { + ChannelBufferInputStream channelStream = null; try { - queue.put(new ChannelBufferInputStream(response.getContent())); + channelStream = new ChannelBufferInputStream(response.getContent()); + queue.put(channelStream); } catch (InterruptedException e) { log.error(e, "Queue appending interrupted"); Thread.currentThread().interrupt(); throw new RuntimeException(e); } + finally { + CloseQuietly.close(channelStream); + } byteCount.addAndGet(response.getContent().readableBytes()); return ClientResponse.finished( new SequenceInputStream( @@ -106,8 +112,10 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler 0) { + ChannelBufferInputStream channelStream = null; try { - queue.put(new ChannelBufferInputStream(channelBuffer)); + channelStream = new ChannelBufferInputStream(channelBuffer); + queue.put(channelStream); // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong log.debug("Added stream. Queue length %d", queue.size()); } @@ -116,6 +124,9 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler) () -> { - storage.insert(config.getBucket(), taskKey, mediaContent); - return null; - }, - GoogleUtils::isRetryable, - 1, - 5 - ); - } - catch (IOException e) { - throw e; - } - catch (Exception e) { - throw new RE(e, "Failed to upload [%s] to [%s]", logFile, taskKey); + try { + RetryUtils.retry( + (RetryUtils.Task) () -> { + storage.insert(config.getBucket(), taskKey, mediaContent); + return null; + }, + GoogleUtils::isRetryable, + 1, + 5 + ); + } + catch (IOException e) { + throw e; + } + catch (Exception e) { + throw new RE(e, "Failed to upload [%s] to [%s]", logFile, taskKey); + } } } diff --git a/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java b/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java index 79d10dfeaaf..9fe6fec5dd1 100644 --- a/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java +++ b/processing/src/main/java/org/apache/druid/guice/PropertiesModule.java @@ -69,8 +69,8 @@ public class PropertiesModule implements Module if (stream != null) { log.info("Loading properties from %s", propertiesFile); - try { - fileProps.load(new InputStreamReader(stream, StandardCharsets.UTF_8)); + try (final InputStreamReader in = new InputStreamReader(stream, StandardCharsets.UTF_8)) { + fileProps.load(in); } catch (IOException e) { throw new RuntimeException(e); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 249060a5b8c..c3f0fba7721 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -40,7 +40,9 @@ import org.apache.druid.segment.ColumnSelectorFactory; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -215,8 +217,10 @@ public class SpillingGrouper implements Grouper for (File dictFile : dictionaryFiles) { try ( + final InputStream fileStream = Files.newInputStream(dictFile.toPath()); + final LZ4BlockInputStream blockStream = new LZ4BlockInputStream(fileStream); final MappingIterator dictIterator = spillMapper.readValues( - spillMapper.getFactory().createParser(new LZ4BlockInputStream(new FileInputStream(dictFile))), + spillMapper.getFactory().createParser(blockStream), spillMapper.getTypeFactory().constructType(String.class) ) ) { diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index 3974915b043..fd5c481f6cc 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -35,7 +35,6 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.WriteOutBytes; import javax.annotation.Nullable; -import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -457,10 +456,11 @@ public class GenericIndexedWriter implements Serializer private void initializeHeaderOutLong() throws IOException { headerOutLong = new LongArrayList(); - DataInput headerOutAsIntInput = new DataInputStream(headerOut.asInputStream()); - for (int i = 0; i < numWritten; i++) { - int count = headerOutAsIntInput.readInt(); - headerOutLong.add(count); + try (final DataInputStream headerOutAsIntInput = new DataInputStream(headerOut.asInputStream())) { + for (int i = 0; i < numWritten; i++) { + int count = headerOutAsIntInput.readInt(); + headerOutLong.add(count); + } } } diff --git a/server/src/main/java/org/apache/druid/server/security/TLSUtils.java b/server/src/main/java/org/apache/druid/server/security/TLSUtils.java index 4ff34af8545..ea85b99b499 100644 --- a/server/src/main/java/org/apache/druid/server/security/TLSUtils.java +++ b/server/src/main/java/org/apache/druid/server/security/TLSUtils.java @@ -32,8 +32,10 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509ExtendedKeyManager; import javax.net.ssl.X509ExtendedTrustManager; -import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; @@ -183,10 +185,12 @@ public class TLSUtils KeyStore trustStore = KeyStore.getInstance(trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType); - trustStore.load( - new FileInputStream(trustStorePath), - trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray() - ); + try (final InputStream trustStoreFileStream = Files.newInputStream(Paths.get(trustStorePath))) { + trustStore.load( + trustStoreFileStream, + trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray() + ); + } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : trustStoreAlgorithm); @@ -197,20 +201,24 @@ public class TLSUtils KeyStore keyStore = KeyStore.getInstance(keyStoreType == null ? KeyStore.getDefaultType() : keyStoreType); - keyStore.load( - new FileInputStream(keyStorePath), - keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray() - ); + try (final InputStream keyStoreFileStream = Files.newInputStream(Paths.get(keyStorePath))) { + keyStore.load( + keyStoreFileStream, + keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray() + ); - KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance( - keyStoreAlgorithm == null ? - KeyManagerFactory.getDefaultAlgorithm() : keyStoreAlgorithm - ); - keyManagerFactory.init( - keyStore, - keyManagerFactoryPasswordProvider == null ? null : keyManagerFactoryPasswordProvider.getPassword().toCharArray() - ); - keyManagers = createAliasedKeyManagers(keyManagerFactory.getKeyManagers(), certAlias); + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance( + keyStoreAlgorithm == null ? + KeyManagerFactory.getDefaultAlgorithm() : keyStoreAlgorithm + ); + keyManagerFactory.init( + keyStore, + keyManagerFactoryPasswordProvider == null + ? null + : keyManagerFactoryPasswordProvider.getPassword().toCharArray() + ); + keyManagers = createAliasedKeyManagers(keyManagerFactory.getKeyManagers(), certAlias); + } } else { keyManagers = null; }