From 46beaa06408f814936ce5171bbec25887d7aec94 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 16 May 2020 09:57:12 -0700 Subject: [PATCH] Fix potential resource leak in ParquetReader (#9852) * Fix potential resource leak in ParquetReader * add test * never thrown exception * catch potential exceptions --- .../apache/druid/data/input/InputFormat.java | 3 +- .../input/parquet/ParquetInputFormat.java | 3 +- .../data/input/parquet/ParquetReader.java | 35 +++-- .../input/parquet/BaseParquetReaderTest.java | 4 +- .../ParquetReaderResourceLeakTest.java | 120 ++++++++++++++++++ .../SettableByteEntityReader.java | 2 +- 6 files changed, 148 insertions(+), 19 deletions(-) create mode 100644 extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/InputFormat.java b/core/src/main/java/org/apache/druid/data/input/InputFormat.java index a3a5dd24117..485a76ba666 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -32,7 +32,6 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.annotations.UnstableApi; import java.io.File; -import java.io.IOException; /** * InputFormat abstracts the file format of input data. @@ -64,5 +63,5 @@ public interface InputFormat InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory - ) throws IOException; + ); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java index c49581a9b72..1de4c552ac3 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import javax.annotation.Nullable; import java.io.File; -import java.io.IOException; import java.util.Objects; public class ParquetInputFormat extends NestedInputFormat @@ -69,7 +68,7 @@ public class ParquetInputFormat extends NestedInputFormat InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory - ) throws IOException + ) { return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java index 2c76253982f..1bdd0114ddd 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.parquet; import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntity.CleanableFile; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.IntermediateRowParsingReader; @@ -45,12 +46,12 @@ import java.util.NoSuchElementException; public class ParquetReader extends IntermediateRowParsingReader { + private final Configuration conf; private final InputRowSchema inputRowSchema; + private final InputEntity source; + private final File temporaryDirectory; private final ObjectFlattener flattener; - private final org.apache.parquet.hadoop.ParquetReader reader; - private final Closer closer; - ParquetReader( Configuration conf, InputRowSchema inputRowSchema, @@ -58,31 +59,41 @@ public class ParquetReader extends IntermediateRowParsingReader File temporaryDirectory, JSONPathSpec flattenSpec, boolean binaryAsString - ) throws IOException + ) { + this.conf = conf; this.inputRowSchema = inputRowSchema; + this.source = source; + this.temporaryDirectory = temporaryDirectory; this.flattener = ObjectFlatteners.create(flattenSpec, new ParquetGroupFlattenerMaker(binaryAsString)); + } - closer = Closer.create(); + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + final Closer closer = Closer.create(); byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE]; - final InputEntity.CleanableFile file = closer.register(source.fetch(temporaryDirectory, buffer)); - final Path path = new Path(file.file().toURI()); - final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); + final org.apache.parquet.hadoop.ParquetReader reader; try { + final CleanableFile file = closer.register(source.fetch(temporaryDirectory, buffer)); + final Path path = new Path(file.file().toURI()); + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path) .withConf(conf) .build()); } + catch (Exception e) { + // We don't expect to see any exceptions thrown in the above try clause, + // but we catch it just in case to avoid any potential resource leak. + closer.close(); + throw new RuntimeException(e); + } finally { Thread.currentThread().setContextClassLoader(currentClassLoader); } - } - @Override - protected CloseableIterator intermediateRowIterator() - { return new CloseableIterator() { Group value = null; diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java index 87f9229b313..b96cb7b2ddb 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java @@ -39,7 +39,7 @@ class BaseParquetReaderTest { ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter(); - InputEntityReader createReader(String parquetFile, InputRowSchema schema, JSONPathSpec flattenSpec) throws IOException + InputEntityReader createReader(String parquetFile, InputRowSchema schema, JSONPathSpec flattenSpec) { return createReader(parquetFile, schema, flattenSpec, false); } @@ -49,7 +49,7 @@ class BaseParquetReaderTest InputRowSchema schema, JSONPathSpec flattenSpec, boolean binaryAsString - ) throws IOException + ) { FileEntity entity = new FileEntity(new File(parquetFile)); ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString, new Configuration()); diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java new file mode 100644 index 00000000000..251fa344bb7 --- /dev/null +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.FileEntity; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Objects; + +public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testFetchOnReadCleanupAfterExhaustingIterator() throws IOException + { + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled")) + ), + Collections.emptyList() + ); + FetchingFileEntity entity = new FetchingFileEntity(new File("example/wiki/wiki.parquet")); + ParquetInputFormat parquet = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration()); + File tempDir = temporaryFolder.newFolder(); + InputEntityReader reader = parquet.createReader(schema, entity, tempDir); + Assert.assertEquals(0, Objects.requireNonNull(tempDir.list()).length); + try (CloseableIterator iterator = reader.read()) { + Assert.assertTrue(Objects.requireNonNull(tempDir.list()).length > 0); + while (iterator.hasNext()) { + iterator.next(); + } + } + Assert.assertEquals(0, Objects.requireNonNull(tempDir.list()).length); + } + + private static class FetchingFileEntity extends FileEntity + { + private FetchingFileEntity(File file) + { + super(file); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) + { + // Copied from InputEntity + try { + final File tempFile = File.createTempFile("druid-input-entity", ".tmp", temporaryDirectory); + try (InputStream is = open()) { + FileUtils.copyLarge( + is, + tempFile, + fetchBuffer, + getRetryCondition(), + DEFAULT_MAX_NUM_FETCH_TRIES, + StringUtils.format("Failed to fetch into [%s]", tempFile.getAbsolutePath()) + ); + } + + return new CleanableFile() + { + @Override + public File file() + { + return tempFile; + } + + @Override + public void close() + { + if (!tempFile.delete()) { + LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath()); + } + } + }; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java index e34ee672728..25d752dad95 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java @@ -60,7 +60,7 @@ class SettableByteEntityReader implements InputEntityReader this.indexingTmpDir = indexingTmpDir; } - void setEntity(ByteEntity entity) throws IOException + void setEntity(ByteEntity entity) { this.delegate = new TransformingInputEntityReader( // Yes, we are creating a new reader for every stream chunk.