Resource leak in DruidSegmentReader (#9476)

* Close the Yielder in DruidSegmentReader

* forbidden api
This commit is contained in:
Jihoon Son 2020-03-09 10:05:25 -07:00 committed by GitHub
parent a677664811
commit f456d2fcf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 93 additions and 4 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.input; package org.apache.druid.indexing.input;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntity;
@ -33,6 +34,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -147,13 +149,14 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
} }
/** /**
* @param sequence A sequence of intermediate rows generated from a sequence of * @param sequence A sequence of intermediate rows generated from a sequence of
* cursors in {@link #intermediateRowIterator()} * cursors in {@link #intermediateRowIterator()}
* @param segmentFile The underlying segment file containing the row data * @param segmentFile The underlying segment file containing the row data
* @return A CloseableIterator from a sequence of intermediate rows, closing the underlying segment file * @return A CloseableIterator from a sequence of intermediate rows, closing the underlying segment file
* when the iterator is closed. * when the iterator is closed.
*/ */
private static CloseableIterator<Map<String, Object>> makeCloseableIteratorFromSequenceAndSegmentFile( @VisibleForTesting
static CloseableIterator<Map<String, Object>> makeCloseableIteratorFromSequenceAndSegmentFile(
final Sequence<Map<String, Object>> sequence, final Sequence<Map<String, Object>> sequence,
final CleanableFile segmentFile final CleanableFile segmentFile
) )
@ -179,7 +182,10 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
segmentFile.close(); Closer closer = Closer.create();
closer.register(rowYielder);
closer.register(segmentFile);
closer.close();
} }
}; };
} }

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.input;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
public class DruidSegmentReaderTest
{
@Test
public void testMakeCloseableIteratorFromSequenceAndSegmentFileCloseYielderOnClose() throws IOException
{
MutableBoolean isSequenceClosed = new MutableBoolean(false);
MutableBoolean isFileClosed = new MutableBoolean(false);
Sequence<Map<String, Object>> sequence = new BaseSequence<>(
new IteratorMaker<Map<String, Object>, Iterator<Map<String, Object>>>()
{
@Override
public Iterator<Map<String, Object>> make()
{
return Collections.emptyIterator();
}
@Override
public void cleanup(Iterator<Map<String, Object>> iterFromMake)
{
isSequenceClosed.setValue(true);
}
}
);
CleanableFile cleanableFile = new CleanableFile()
{
@Override
public File file()
{
return null;
}
@Override
public void close()
{
isFileClosed.setValue(true);
}
};
try (CloseableIterator<Map<String, Object>> iterator =
DruidSegmentReader.makeCloseableIteratorFromSequenceAndSegmentFile(sequence, cleanableFile)) {
while (iterator.hasNext()) {
iterator.next();
}
}
Assert.assertTrue("File is not closed", isFileClosed.booleanValue());
Assert.assertTrue("Sequence is not closed", isSequenceClosed.booleanValue());
}
}