Fix potential resource leak in ParquetReader (#9852)

* Fix potential resource leak in ParquetReader

* add test

* never thrown exception

* catch potential exceptions
This commit is contained in:
Jihoon Son 2020-05-16 09:57:12 -07:00 committed by GitHub
parent 0a8bf83bc5
commit 46beaa0640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 148 additions and 19 deletions

View File

@ -32,7 +32,6 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi;
import java.io.File;
import java.io.IOException;
/**
* InputFormat abstracts the file format of input data.
@ -64,5 +63,5 @@ public interface InputFormat
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory
) throws IOException;
);
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
public class ParquetInputFormat extends NestedInputFormat
@ -69,7 +68,7 @@ public class ParquetInputFormat extends NestedInputFormat
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory
) throws IOException
)
{
return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
@ -45,12 +46,12 @@ import java.util.NoSuchElementException;
public class ParquetReader extends IntermediateRowParsingReader<Group>
{
private final Configuration conf;
private final InputRowSchema inputRowSchema;
private final InputEntity source;
private final File temporaryDirectory;
private final ObjectFlattener<Group> flattener;
private final org.apache.parquet.hadoop.ParquetReader<Group> reader;
private final Closer closer;
ParquetReader(
Configuration conf,
InputRowSchema inputRowSchema,
@ -58,31 +59,41 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
File temporaryDirectory,
JSONPathSpec flattenSpec,
boolean binaryAsString
) throws IOException
)
{
this.conf = conf;
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.flattener = ObjectFlatteners.create(flattenSpec, new ParquetGroupFlattenerMaker(binaryAsString));
}
closer = Closer.create();
@Override
protected CloseableIterator<Group> intermediateRowIterator() throws IOException
{
final Closer closer = Closer.create();
byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
final InputEntity.CleanableFile file = closer.register(source.fetch(temporaryDirectory, buffer));
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
final org.apache.parquet.hadoop.ParquetReader<Group> reader;
try {
final CleanableFile file = closer.register(source.fetch(temporaryDirectory, buffer));
final Path path = new Path(file.file().toURI());
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path)
.withConf(conf)
.build());
}
catch (Exception e) {
// We don't expect to see any exceptions thrown in the above try clause,
// but we catch it just in case to avoid any potential resource leak.
closer.close();
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
}
@Override
protected CloseableIterator<Group> intermediateRowIterator()
{
return new CloseableIterator<Group>()
{
Group value = null;

View File

@ -39,7 +39,7 @@ class BaseParquetReaderTest
{
ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter();
InputEntityReader createReader(String parquetFile, InputRowSchema schema, JSONPathSpec flattenSpec) throws IOException
InputEntityReader createReader(String parquetFile, InputRowSchema schema, JSONPathSpec flattenSpec)
{
return createReader(parquetFile, schema, flattenSpec, false);
}
@ -49,7 +49,7 @@ class BaseParquetReaderTest
InputRowSchema schema,
JSONPathSpec flattenSpec,
boolean binaryAsString
) throws IOException
)
{
FileEntity entity = new FileEntity(new File(parquetFile));
ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString, new Configuration());

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Objects;
public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testFetchOnReadCleanupAfterExhaustingIterator() throws IOException
{
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))
),
Collections.emptyList()
);
FetchingFileEntity entity = new FetchingFileEntity(new File("example/wiki/wiki.parquet"));
ParquetInputFormat parquet = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration());
File tempDir = temporaryFolder.newFolder();
InputEntityReader reader = parquet.createReader(schema, entity, tempDir);
Assert.assertEquals(0, Objects.requireNonNull(tempDir.list()).length);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(Objects.requireNonNull(tempDir.list()).length > 0);
while (iterator.hasNext()) {
iterator.next();
}
}
Assert.assertEquals(0, Objects.requireNonNull(tempDir.list()).length);
}
private static class FetchingFileEntity extends FileEntity
{
private FetchingFileEntity(File file)
{
super(file);
}
@Override
public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer)
{
// Copied from InputEntity
try {
final File tempFile = File.createTempFile("druid-input-entity", ".tmp", temporaryDirectory);
try (InputStream is = open()) {
FileUtils.copyLarge(
is,
tempFile,
fetchBuffer,
getRetryCondition(),
DEFAULT_MAX_NUM_FETCH_TRIES,
StringUtils.format("Failed to fetch into [%s]", tempFile.getAbsolutePath())
);
}
return new CleanableFile()
{
@Override
public File file()
{
return tempFile;
}
@Override
public void close()
{
if (!tempFile.delete()) {
LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath());
}
}
};
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -60,7 +60,7 @@ class SettableByteEntityReader implements InputEntityReader
this.indexingTmpDir = indexingTmpDir;
}
void setEntity(ByteEntity entity) throws IOException
void setEntity(ByteEntity entity)
{
this.delegate = new TransformingInputEntityReader(
// Yes, we are creating a new reader for every stream chunk.