Fix resource leaks (#3702)

This commit is contained in:
Roman Leventov 2016-11-18 09:51:36 -06:00 committed by Nishant
parent 7e80d1045a
commit 7b56cec3b9
12 changed files with 39 additions and 84 deletions

View File

@ -31,16 +31,15 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedMap;
import com.google.common.io.CharStreams;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.io.Resources;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.net.URL;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -252,11 +251,10 @@ public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
String actualPath = mapPath; String actualPath = mapPath;
try { try {
if (Strings.isNullOrEmpty(mapPath)) { if (Strings.isNullOrEmpty(mapPath)) {
actualPath = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json").getFile(); URL resource = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json");
actualPath = resource.getFile();
LOGGER.info("using default whiteList map located at [%s]", actualPath); LOGGER.info("using default whiteList map located at [%s]", actualPath);
fileContent = CharStreams.toString(new InputStreamReader(this.getClass() fileContent = Resources.toString(resource, Charset.defaultCharset());
.getClassLoader()
.getResourceAsStream("defaultWhiteListMap.json")));
} else { } else {
fileContent = Files.asCharSource(new File(mapPath), Charset.forName("UTF-8")).read(); fileContent = Files.asCharSource(new File(mapPath), Charset.forName("UTF-8")).read();
} }

View File

@ -23,7 +23,6 @@ import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.FileUtils; import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
@ -36,7 +35,6 @@ import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
@ -177,7 +175,6 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller
public FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException public FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException
{ {
final LocalFileSystem localFileSystem = new LocalFileSystem();
try { try {
final FileSystem fs = path.getFileSystem(config); final FileSystem fs = path.getFileSystem(config);
if (fs.isDirectory(path)) { if (fs.isDirectory(path)) {

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
@ -137,14 +138,15 @@ public class NoopTask extends AbstractTask
{ {
if (firehoseFactory != null) { if (firehoseFactory != null) {
log.info("Connecting firehose"); log.info("Connecting firehose");
firehoseFactory.connect(null);
} }
try (Firehose firehose = firehoseFactory != null ? firehoseFactory.connect(null) : null) {
log.info("Running noop task[%s]", getId()); log.info("Running noop task[%s]", getId());
log.info("Sleeping for %,d millis.", runTime); log.info("Sleeping for %,d millis.", runTime);
Thread.sleep(runTime); Thread.sleep(runTime);
log.info("Woke up!"); log.info("Woke up!");
return TaskStatus.success(getId()); return TaskStatus.success(getId());
}
} }
public static NoopTask create() public static NoopTask create()

View File

@ -41,31 +41,16 @@ public class PortFinder
private static boolean canBind(int portNum) private static boolean canBind(int portNum)
{ {
ServerSocket ss = null;
boolean isFree = false;
try { try {
ss = new ServerSocket(portNum); new ServerSocket(portNum).close();
isFree = true; return true;
} }
catch (BindException be) { catch (BindException be) {
isFree = false; // port in use, return false;
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
finally {
if (ss != null) {
while (!ss.isClosed()) {
try {
ss.close();
}
catch (IOException e) {
// ignore
}
}
}
}
return isFree;
} }
public synchronized int findUnusedPort() public synchronized int findUnusedPort()

View File

@ -1,34 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public class Props {
public static Properties fromFilename(String filename) throws IOException {
final Properties props = new Properties();
props.load(new FileInputStream(filename));
return props;
}
}

View File

@ -119,7 +119,8 @@ public class ByteBufferWriter<T> implements Closeable
public void writeToChannel(WritableByteChannel channel) throws IOException public void writeToChannel(WritableByteChannel channel) throws IOException
{ {
final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput()); try (final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput())) {
ByteStreams.copy(from, channel); ByteStreams.copy(from, channel);
}
} }
} }

View File

@ -129,7 +129,8 @@ public class CompressedIntsIndexedWriter extends SingleValueIndexedIntsWriter
channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted))); channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor))); channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor)));
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput()); try (final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput())) {
ByteStreams.copy(from, channel); ByteStreams.copy(from, channel);
}
} }
} }

View File

@ -144,7 +144,8 @@ public class CompressedVSizeIntsIndexedWriter extends SingleValueIndexedIntsWrit
channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted))); channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor))); channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor)));
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput()); try (final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput())) {
ByteStreams.copy(from, channel); ByteStreams.copy(from, channel);
}
} }
} }

View File

@ -160,7 +160,8 @@ public class GenericIndexedWriter<T> implements Closeable
public void writeToChannel(WritableByteChannel channel) throws IOException public void writeToChannel(WritableByteChannel channel) throws IOException
{ {
final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput()); try (final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput())) {
ByteStreams.copy(from, channel); ByteStreams.copy(from, channel);
}
} }
} }

View File

@ -117,10 +117,11 @@ public class IntermediateLongSupplierSerializer implements LongSupplierSerialize
); );
} }
DataInputStream tempIn = new DataInputStream(new BufferedInputStream(ioPeon.makeInputStream(tempFile))); try (DataInputStream tempIn = new DataInputStream(new BufferedInputStream(ioPeon.makeInputStream(tempFile)))) {
delegate.open(); delegate.open();
while (tempIn.available() > 0) { while (tempIn.available() > 0) {
delegate.add(tempIn.readLong()); delegate.add(tempIn.readLong());
}
} }
} }

View File

@ -88,7 +88,8 @@ public class VSizeIndexedIntsWriter extends SingleValueIndexedIntsWriter
long numBytesWritten = valuesOut.getCount(); long numBytesWritten = valuesOut.getCount();
channel.write(ByteBuffer.wrap(new byte[]{VERSION, (byte) numBytes})); channel.write(ByteBuffer.wrap(new byte[]{VERSION, (byte) numBytes}));
channel.write(ByteBuffer.wrap(Ints.toByteArray((int) numBytesWritten))); channel.write(ByteBuffer.wrap(Ints.toByteArray((int) numBytesWritten)));
final ReadableByteChannel from = Channels.newChannel(ioPeon.makeInputStream(valueFileName)); try (final ReadableByteChannel from = Channels.newChannel(ioPeon.makeInputStream(valueFileName))) {
ByteStreams.copy(from, channel); ByteStreams.copy(from, channel);
}
} }
} }

View File

@ -165,7 +165,8 @@ public class VSizeIndexedWriter extends MultiValueIndexedIntsWriter implements C
@Override @Override
public void writeToChannel(WritableByteChannel channel) throws IOException public void writeToChannel(WritableByteChannel channel) throws IOException
{ {
final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput()); try (final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput())) {
ByteStreams.copy(from, channel); ByteStreams.copy(from, channel);
}
} }
} }