Fix resource leak (#8337)

* Fix resource leak

* Patch comments
This commit is contained in:
Benedict Jin 2019-08-20 17:55:41 +08:00 committed by Roman Leventov
parent e64070b8bf
commit 781873ba53
8 changed files with 81 additions and 57 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.java.util.http.client.response; package org.apache.druid.java.util.http.client.response;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.buffer.ChannelBufferInputStream;
@ -56,14 +57,19 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler<I
@Override @Override
public ClientResponse<InputStream> handleResponse(HttpResponse response, TrafficCop trafficCop) public ClientResponse<InputStream> handleResponse(HttpResponse response, TrafficCop trafficCop)
{ {
ChannelBufferInputStream channelStream = null;
try { try {
queue.put(new ChannelBufferInputStream(response.getContent())); channelStream = new ChannelBufferInputStream(response.getContent());
queue.put(channelStream);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
log.error(e, "Queue appending interrupted"); log.error(e, "Queue appending interrupted");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException(e); throw new RuntimeException(e);
} }
finally {
CloseQuietly.close(channelStream);
}
byteCount.addAndGet(response.getContent().readableBytes()); byteCount.addAndGet(response.getContent().readableBytes());
return ClientResponse.finished( return ClientResponse.finished(
new SequenceInputStream( new SequenceInputStream(
@ -106,8 +112,10 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler<I
final ChannelBuffer channelBuffer = chunk.getContent(); final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes(); final int bytes = channelBuffer.readableBytes();
if (bytes > 0) { if (bytes > 0) {
ChannelBufferInputStream channelStream = null;
try { try {
queue.put(new ChannelBufferInputStream(channelBuffer)); channelStream = new ChannelBufferInputStream(channelBuffer);
queue.put(channelStream);
// Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong
log.debug("Added stream. Queue length %d", queue.size()); log.debug("Added stream. Queue length %d", queue.size());
} }
@ -116,6 +124,9 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler<I
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException(e); throw new RuntimeException(e);
} }
finally {
CloseQuietly.close(channelStream);
}
byteCount.addAndGet(bytes); byteCount.addAndGet(bytes);
} else { } else {
log.debug("Skipping zero length chunk"); log.debug("Skipping zero length chunk");

View File

@ -30,8 +30,8 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedMap;
import com.google.common.io.CharStreams;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.io.Resources;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@ -39,8 +39,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.net.URL;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -211,10 +210,10 @@ public class WhiteListBasedDruidToTimelineEventConverter implements DruidToTimel
String actualPath = mapPath; String actualPath = mapPath;
try { try {
if (Strings.isNullOrEmpty(mapPath)) { if (Strings.isNullOrEmpty(mapPath)) {
actualPath = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json").getFile(); URL defaultWhiteListMapUrl = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json");
actualPath = defaultWhiteListMapUrl.getFile();
LOGGER.info("using default whiteList map located at [%s]", actualPath); LOGGER.info("using default whiteList map located at [%s]", actualPath);
InputStream byteContent = this.getClass().getClassLoader().getResourceAsStream("defaultWhiteListMap.json"); fileContent = Resources.toString(defaultWhiteListMapUrl, StandardCharsets.UTF_8);
fileContent = CharStreams.toString(new InputStreamReader(byteContent, StandardCharsets.UTF_8));
} else { } else {
fileContent = Files.asCharSource(new File(mapPath), StandardCharsets.UTF_8).read(); fileContent = Files.asCharSource(new File(mapPath), StandardCharsets.UTF_8).read();
} }

View File

@ -34,9 +34,10 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.nio.file.Files;
import java.util.Map; import java.util.Map;
/** /**
@ -88,9 +89,9 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
int version = SegmentUtils.getVersionFromDir(indexFilesDir); int version = SegmentUtils.getVersionFromDir(indexFilesDir);
try { try (final InputStream fileStream = Files.newInputStream(compressedIndexFile.toPath())) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) ChunkedStorage.newWriter(indexStorage, key, fileStream)
.withConcurrencyLevel(CONCURRENCY).call(); .withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment); byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch(); MutationBatch mutation = this.keyspace.prepareMutationBatch();

View File

@ -30,9 +30,9 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.tasklogs.TaskLogs; import org.apache.druid.tasklogs.TaskLogs;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.Files;
public class GoogleTaskLogs implements TaskLogs public class GoogleTaskLogs implements TaskLogs
{ {
@ -66,7 +66,7 @@ public class GoogleTaskLogs implements TaskLogs
private void pushTaskFile(final File logFile, final String taskKey) throws IOException private void pushTaskFile(final File logFile, final String taskKey) throws IOException
{ {
FileInputStream fileStream = new FileInputStream(logFile); try (final InputStream fileStream = Files.newInputStream(logFile.toPath())) {
InputStreamContent mediaContent = new InputStreamContent("text/plain", fileStream); InputStreamContent mediaContent = new InputStreamContent("text/plain", fileStream);
mediaContent.setLength(logFile.length()); mediaContent.setLength(logFile.length());
@ -89,6 +89,7 @@ public class GoogleTaskLogs implements TaskLogs
throw new RE(e, "Failed to upload [%s] to [%s]", logFile, taskKey); throw new RE(e, "Failed to upload [%s] to [%s]", logFile, taskKey);
} }
} }
}
@Override @Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException

View File

@ -69,8 +69,8 @@ public class PropertiesModule implements Module
if (stream != null) { if (stream != null) {
log.info("Loading properties from %s", propertiesFile); log.info("Loading properties from %s", propertiesFile);
try { try (final InputStreamReader in = new InputStreamReader(stream, StandardCharsets.UTF_8)) {
fileProps.load(new InputStreamReader(stream, StandardCharsets.UTF_8)); fileProps.load(in);
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -40,7 +40,9 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
@ -215,8 +217,10 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
for (File dictFile : dictionaryFiles) { for (File dictFile : dictionaryFiles) {
try ( try (
final InputStream fileStream = Files.newInputStream(dictFile.toPath());
final LZ4BlockInputStream blockStream = new LZ4BlockInputStream(fileStream);
final MappingIterator<String> dictIterator = spillMapper.readValues( final MappingIterator<String> dictIterator = spillMapper.readValues(
spillMapper.getFactory().createParser(new LZ4BlockInputStream(new FileInputStream(dictFile))), spillMapper.getFactory().createParser(blockStream),
spillMapper.getTypeFactory().constructType(String.class) spillMapper.getTypeFactory().constructType(String.class)
) )
) { ) {

View File

@ -35,7 +35,6 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.DataInput;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -457,11 +456,12 @@ public class GenericIndexedWriter<T> implements Serializer
private void initializeHeaderOutLong() throws IOException private void initializeHeaderOutLong() throws IOException
{ {
headerOutLong = new LongArrayList(); headerOutLong = new LongArrayList();
DataInput headerOutAsIntInput = new DataInputStream(headerOut.asInputStream()); try (final DataInputStream headerOutAsIntInput = new DataInputStream(headerOut.asInputStream())) {
for (int i = 0; i < numWritten; i++) { for (int i = 0; i < numWritten; i++) {
int count = headerOutAsIntInput.readInt(); int count = headerOutAsIntInput.readInt();
headerOutLong.add(count); headerOutLong.add(count);
} }
} }
}
} }

View File

@ -32,8 +32,10 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509ExtendedKeyManager; import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509ExtendedTrustManager; import javax.net.ssl.X509ExtendedTrustManager;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyManagementException; import java.security.KeyManagementException;
import java.security.KeyStore; import java.security.KeyStore;
import java.security.KeyStoreException; import java.security.KeyStoreException;
@ -183,10 +185,12 @@ public class TLSUtils
KeyStore trustStore = KeyStore.getInstance(trustStoreType == null KeyStore trustStore = KeyStore.getInstance(trustStoreType == null
? KeyStore.getDefaultType() ? KeyStore.getDefaultType()
: trustStoreType); : trustStoreType);
try (final InputStream trustStoreFileStream = Files.newInputStream(Paths.get(trustStorePath))) {
trustStore.load( trustStore.load(
new FileInputStream(trustStorePath), trustStoreFileStream,
trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray() trustStorePasswordProvider == null ? null : trustStorePasswordProvider.getPassword().toCharArray()
); );
}
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm == null TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm == null
? TrustManagerFactory.getDefaultAlgorithm() ? TrustManagerFactory.getDefaultAlgorithm()
: trustStoreAlgorithm); : trustStoreAlgorithm);
@ -197,8 +201,9 @@ public class TLSUtils
KeyStore keyStore = KeyStore.getInstance(keyStoreType == null KeyStore keyStore = KeyStore.getInstance(keyStoreType == null
? KeyStore.getDefaultType() ? KeyStore.getDefaultType()
: keyStoreType); : keyStoreType);
try (final InputStream keyStoreFileStream = Files.newInputStream(Paths.get(keyStorePath))) {
keyStore.load( keyStore.load(
new FileInputStream(keyStorePath), keyStoreFileStream,
keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray() keyStorePasswordProvider == null ? null : keyStorePasswordProvider.getPassword().toCharArray()
); );
@ -208,9 +213,12 @@ public class TLSUtils
); );
keyManagerFactory.init( keyManagerFactory.init(
keyStore, keyStore,
keyManagerFactoryPasswordProvider == null ? null : keyManagerFactoryPasswordProvider.getPassword().toCharArray() keyManagerFactoryPasswordProvider == null
? null
: keyManagerFactoryPasswordProvider.getPassword().toCharArray()
); );
keyManagers = createAliasedKeyManagers(keyManagerFactory.getKeyManagers(), certAlias); keyManagers = createAliasedKeyManagers(keyManagerFactory.getKeyManagers(), certAlias);
}
} else { } else {
keyManagers = null; keyManagers = null;
} }