free() dictionary merging buffers in IndexMerger (#4684)

* free() dictionary merging buffers in IndexMerger

* Use close() for dictionary merge iterators

* Add comments on buffer free
This commit is contained in:
Jonathan Wei 2017-08-15 10:11:29 -07:00 committed by Gian Merlino
parent 725a144096
commit ab28dc3b97
3 changed files with 29 additions and 11 deletions

View File

@ -29,11 +29,13 @@ import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.inject.ImplementedBy;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.ByteBufferUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.data.Indexed;
@ -418,9 +420,10 @@ public interface IndexMerger
}
}
class DictionaryMergeIterator implements Iterator<String>
class DictionaryMergeIterator implements CloseableIterator<String>
{
protected final IntBuffer[] conversions;
protected final List<Pair<ByteBuffer, Integer>> directBufferAllocations = Lists.newArrayList();
protected final PriorityQueue<Pair<Integer, PeekingIterator<String>>> pQueue;
protected int counter;
@ -446,8 +449,10 @@ public interface IndexMerger
Indexed<String> indexed = dimValueLookups[i];
if (useDirect) {
int allocationSize = indexed.size() * Ints.BYTES;
log.info("Allocating dictionary merging direct buffer with size[%d]", allocationSize);
conversions[i] = ByteBuffer.allocateDirect(allocationSize).asIntBuffer();
log.info("Allocating dictionary merging direct buffer with size[%,d]", allocationSize);
final ByteBuffer conversionDirectBuffer = ByteBuffer.allocateDirect(allocationSize);
conversions[i] = conversionDirectBuffer.asIntBuffer();
directBufferAllocations.add(new Pair<>(conversionDirectBuffer, allocationSize));
} else {
conversions[i] = IntBuffer.allocate(indexed.size());
}
@ -525,5 +530,14 @@ public interface IndexMerger
{
throw new UnsupportedOperationException("remove");
}
@Override
public void close()
{
for (Pair<ByteBuffer, Integer> bufferAllocation : directBufferAllocations) {
log.info("Freeing dictionary merging direct buffer with size[%,d]", bufferAllocation.rhs);
ByteBufferUtils.free(bufferAllocation.lhs);
}
}
}
}

View File

@ -93,6 +93,7 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
protected List<IndexableAdapter> adapters;
protected ProgressIndicator progress;
protected final IndexSpec indexSpec;
protected IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
public StringDimensionMergerV9(
String dimensionName,
@ -167,18 +168,18 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
cardinality = 0;
if (numMergeIndex > 1) {
IndexMerger.DictionaryMergeIterator iterator = new IndexMerger.DictionaryMergeIterator(dimValueLookups, true);
dictionaryMergeIterator = new IndexMerger.DictionaryMergeIterator(dimValueLookups, true);
while (iterator.hasNext()) {
dictionaryWriter.write(iterator.next());
while (dictionaryMergeIterator.hasNext()) {
dictionaryWriter.write(dictionaryMergeIterator.next());
}
for (int i = 0; i < adapters.size(); i++) {
if (dimValueLookups[i] != null && iterator.needConversion(i)) {
dimConversions.set(i, iterator.conversions[i]);
if (dimValueLookups[i] != null && dictionaryMergeIterator.needConversion(i)) {
dimConversions.set(i, dictionaryMergeIterator.conversions[i]);
}
}
cardinality = iterator.counter;
cardinality = dictionaryMergeIterator.counter;
} else if (numMergeIndex == 1) {
for (String value : dimValueLookup) {
dictionaryWriter.write(value);
@ -294,6 +295,10 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
try (
Closeable toCloseEncodedValueWriter = encodedValueWriter;
Closeable toCloseBitmapWriter = bitmapWriter;
// We need to free the ByteBuffers allocated by the dictionary merge iterator here,
// these buffers are used by dictIdSeeker in mergeBitmaps() below. The iterator is created and only used
// in writeMergedValueMetadata(), but the buffers are still used until after mergeBitmaps().
Closeable toCloseDictionaryMergeIterator = dictionaryMergeIterator;
Closeable dimValsMappedUnmapper = new Closeable()
{
@Override
@ -342,7 +347,6 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
spatialWriter.close();
}
log.info(
"Completed dim[%s] inverted with cardinality[%,d] in %,d millis.",
dimensionName,

View File

@ -31,7 +31,7 @@ public class DictionaryMergeIteratorTest
{
@Test
public void basicTest()
public void basicTest() throws Exception
{
// a b c d e f
String[] s1 = {"a", "c", "d", "e"}; // 0 2 3 4