Fix resource leaks (ComplexColumn and GenericColumn) (#3629)

* Remove unused ComplexColumnImpl class

* Remove throws IOException from close() in GenericColumn, ComplexColumn, IndexedFloats and IndexedLongs

* Use concise try-with-resources syntax in several places

* Fix resource leaks (ComplexColumn and GenericColumn) in SegmentAnalyzer, SearchQueryRunner, QueryableIndexIndexableAdapter and QueryableIndexStorageAdapter

* Use Closer in Iterable, returned from QueryableIndexIndexableAdapter.getRows(), in order to try to close everything even if closing some parts thew exceptions
This commit is contained in:
Roman Leventov 2016-11-01 21:53:52 -06:00 committed by Nishant
parent eb70a12e43
commit 4b0d6cf789
18 changed files with 102 additions and 178 deletions

View File

@ -306,35 +306,36 @@ public class SegmentAnalyzer
final String typeName final String typeName
) )
{ {
final ComplexColumn complexColumn = column != null ? column.getComplexColumn() : null; try (final ComplexColumn complexColumn = column != null ? column.getComplexColumn() : null) {
final boolean hasMultipleValues = capabilities != null && capabilities.hasMultipleValues(); final boolean hasMultipleValues = capabilities != null && capabilities.hasMultipleValues();
long size = 0; long size = 0;
if (analyzingSize() && complexColumn != null) { if (analyzingSize() && complexColumn != null) {
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) { if (serde == null) {
return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName)); return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName));
}
final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
if (inputSizeFn == null) {
return new ColumnAnalysis(typeName, hasMultipleValues, 0, null, null, null, null);
}
final int length = column.getLength();
for (int i = 0; i < length; ++i) {
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
} }
final Function<Object, Long> inputSizeFn = serde.inputSizeFn(); return new ColumnAnalysis(
if (inputSizeFn == null) { typeName,
return new ColumnAnalysis(typeName, hasMultipleValues, 0, null, null, null, null); hasMultipleValues,
} size,
null,
final int length = column.getLength(); null,
for (int i = 0; i < length; ++i) { null,
size += inputSizeFn.apply(complexColumn.getRowValue(i)); null
} );
} }
return new ColumnAnalysis(
typeName,
hasMultipleValues,
size,
null,
null,
null,
null
);
} }
} }

View File

@ -121,18 +121,22 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
if (!interval.contains(segment.getDataInterval())) { if (!interval.contains(segment.getDataInterval())) {
MutableBitmap timeBitmap = bitmapFactory.makeEmptyMutableBitmap(); MutableBitmap timeBitmap = bitmapFactory.makeEmptyMutableBitmap();
final Column timeColumn = index.getColumn(Column.TIME_COLUMN_NAME); final Column timeColumn = index.getColumn(Column.TIME_COLUMN_NAME);
final GenericColumn timeValues = timeColumn.getGenericColumn(); try (final GenericColumn timeValues = timeColumn.getGenericColumn()) {
int startIndex = Math.max(0, getStartIndexOfTime(timeValues, interval.getStartMillis(), true)); int startIndex = Math.max(0, getStartIndexOfTime(timeValues, interval.getStartMillis(), true));
int endIndex = Math.min(timeValues.length() - 1, getStartIndexOfTime(timeValues, interval.getEndMillis(), false)); int endIndex = Math.min(
timeValues.length() - 1,
getStartIndexOfTime(timeValues, interval.getEndMillis(), false)
);
for (int i = startIndex; i <= endIndex; i++) { for (int i = startIndex; i <= endIndex; i++) {
timeBitmap.add(i); timeBitmap.add(i);
}
final ImmutableBitmap finalTimeBitmap = bitmapFactory.makeImmutableBitmap(timeBitmap);
timeFilteredBitmap =
(baseFilter == null) ? finalTimeBitmap : finalTimeBitmap.intersection(baseFilter);
} }
final ImmutableBitmap finalTimeBitmap = bitmapFactory.makeImmutableBitmap(timeBitmap);
timeFilteredBitmap =
(baseFilter == null) ? finalTimeBitmap : finalTimeBitmap.intersection(baseFilter);
} else { } else {
timeFilteredBitmap = baseFilter; timeFilteredBitmap = baseFilter;
} }

View File

@ -97,14 +97,9 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
@Override @Override
public int getNumRows() public int getNumRows()
{ {
GenericColumn column = null; try (final GenericColumn column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn()) {
try {
column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return column.length(); return column.length();
} }
finally {
CloseQuietly.close(column);
}
} }
@Override @Override

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.collect.FluentIterable; import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
@ -178,8 +179,9 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
return new Iterator<Rowboat>() return new Iterator<Rowboat>()
{ {
final GenericColumn timestamps = input.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn(); final GenericColumn timestamps = input.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
final Object[] metrics; final Closeable[] metrics;
final Closeable[] columns; final Closeable[] columns;
final Closer closer = Closer.create();
final int numMetrics = getMetricNames().size(); final int numMetrics = getMetricNames().size();
@ -190,6 +192,8 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
boolean done = false; boolean done = false;
{ {
closer.register(timestamps);
handlerSet.toArray(handlers); handlerSet.toArray(handlers);
this.columns = FluentIterable this.columns = FluentIterable
.from(handlerSet) .from(handlerSet)
@ -204,9 +208,12 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
} }
} }
).toArray(Closeable.class); ).toArray(Closeable.class);
for (Closeable column : columns) {
closer.register(column);
}
final Indexed<String> availableMetrics = getMetricNames(); final Indexed<String> availableMetrics = getMetricNames();
metrics = new Object[availableMetrics.size()]; metrics = new Closeable[availableMetrics.size()];
for (int i = 0; i < metrics.length; ++i) { for (int i = 0; i < metrics.length; ++i) {
final Column column = input.getColumn(availableMetrics.get(i)); final Column column = input.getColumn(availableMetrics.get(i));
final ValueType type = column.getCapabilities().getType(); final ValueType type = column.getCapabilities().getType();
@ -222,6 +229,9 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
throw new ISE("Cannot handle type[%s]", type); throw new ISE("Cannot handle type[%s]", type);
} }
} }
for (Closeable metricColumn : metrics) {
closer.register(metricColumn);
}
} }
@Override @Override
@ -229,15 +239,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
{ {
final boolean hasNext = currRow < numRows; final boolean hasNext = currRow < numRows;
if (!hasNext && !done) { if (!hasNext && !done) {
CloseQuietly.close(timestamps); CloseQuietly.close(closer);
for (Object metric : metrics) {
if (metric instanceof Closeable) {
CloseQuietly.close((Closeable) metric);
}
}
for (Closeable dimension : columns) {
CloseQuietly.close(dimension);
}
done = true; done = true;
} }
return hasNext; return hasNext;
@ -315,8 +317,11 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
return "float"; return "float";
case LONG: case LONG:
return "long"; return "long";
case COMPLEX: case COMPLEX: {
return column.getComplexColumn().getTypeName(); try (ComplexColumn complexColumn = column.getComplexColumn() ) {
return complexColumn.getTypeName();
}
}
default: default:
throw new ISE("Unknown type[%s]", type); throw new ISE("Unknown type[%s]", type);
} }

View File

@ -28,13 +28,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.metamx.collections.bitmap.ImmutableBitmap; import com.metamx.collections.bitmap.ImmutableBitmap;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.math.expr.Expr;
import io.druid.math.expr.Parser;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.Sequences;
import io.druid.math.expr.Expr;
import io.druid.math.expr.Parser;
import io.druid.query.QueryInterruptedException; import io.druid.query.QueryInterruptedException;
import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
@ -135,27 +135,17 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override @Override
public DateTime getMinTime() public DateTime getMinTime()
{ {
GenericColumn column = null; try (final GenericColumn column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn()) {
try {
column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return new DateTime(column.getLongSingleValueRow(0)); return new DateTime(column.getLongSingleValueRow(0));
} }
finally {
CloseQuietly.close(column);
}
} }
@Override @Override
public DateTime getMaxTime() public DateTime getMaxTime()
{ {
GenericColumn column = null; try (final GenericColumn column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn()) {
try {
column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return new DateTime(column.getLongSingleValueRow(column.length() - 1)); return new DateTime(column.getLongSingleValueRow(column.length() - 1));
} }
finally {
CloseQuietly.close(column);
}
} }
@Override @Override
@ -202,8 +192,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
public String getColumnTypeName(String columnName) public String getColumnTypeName(String columnName)
{ {
final Column column = index.getColumn(columnName); final Column column = index.getColumn(columnName);
final ComplexColumn complexColumn = column.getComplexColumn(); try (final ComplexColumn complexColumn = column.getComplexColumn()) {
return complexColumn != null ? complexColumn.getTypeName() : column.getCapabilities().getType().toString(); return complexColumn != null ? complexColumn.getTypeName() : column.getCapabilities().getType().toString();
}
} }
@Override @Override
@ -382,11 +373,13 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap(); final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap(); final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap(); final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn(); final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
final Closer closer = Closer.create();
closer.register(timestamps);
Iterable<Long> iterable = gran.iterable(interval.getStartMillis(), interval.getEndMillis()); Iterable<Long> iterable = gran.iterable(interval.getStartMillis(), interval.getEndMillis());
if (descending) { if (descending) {
iterable = Lists.reverse(ImmutableList.copyOf(iterable)); iterable = Lists.reverse(ImmutableList.copyOf(iterable));
@ -471,6 +464,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
DictionaryEncodedColumn<String> cachedColumn = dictionaryColumnCache.get(dimension); DictionaryEncodedColumn<String> cachedColumn = dictionaryColumnCache.get(dimension);
if (cachedColumn == null) { if (cachedColumn == null) {
cachedColumn = columnDesc.getDictionaryEncoding(); cachedColumn = columnDesc.getDictionaryEncoding();
closer.register(cachedColumn);
dictionaryColumnCache.put(dimension, cachedColumn); dictionaryColumnCache.put(dimension, cachedColumn);
} }
@ -592,6 +586,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (holder != null && (holder.getCapabilities().getType() == ValueType.FLOAT if (holder != null && (holder.getCapabilities().getType() == ValueType.FLOAT
|| holder.getCapabilities().getType() == ValueType.LONG)) { || holder.getCapabilities().getType() == ValueType.LONG)) {
cachedMetricVals = holder.getGenericColumn(); cachedMetricVals = holder.getGenericColumn();
closer.register(cachedMetricVals);
genericColumnCache.put(columnName, cachedMetricVals); genericColumnCache.put(columnName, cachedMetricVals);
} }
} }
@ -628,6 +623,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (holder != null && (holder.getCapabilities().getType() == ValueType.LONG if (holder != null && (holder.getCapabilities().getType() == ValueType.LONG
|| holder.getCapabilities().getType() == ValueType.FLOAT)) { || holder.getCapabilities().getType() == ValueType.FLOAT)) {
cachedMetricVals = holder.getGenericColumn(); cachedMetricVals = holder.getGenericColumn();
closer.register(cachedMetricVals);
genericColumnCache.put(columnName, cachedMetricVals); genericColumnCache.put(columnName, cachedMetricVals);
} }
} }
@ -676,6 +672,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
} }
if (cachedColumnVals != null) { if (cachedColumnVals != null) {
closer.register((Closeable) cachedColumnVals);
objectColumnCache.put(column, cachedColumnVals); objectColumnCache.put(column, cachedColumnVals);
} }
} }
@ -822,6 +819,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (column == null) { if (column == null) {
continue; continue;
} }
closer.register(column);
if (column.getType() == ValueType.FLOAT) { if (column.getType() == ValueType.FLOAT) {
values.put( values.put(
columnName, new Supplier<Number>() columnName, new Supplier<Number>()
@ -1007,28 +1005,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
} }
} }
), ),
new Closeable() closer
{
@Override
public void close() throws IOException
{
CloseQuietly.close(timestamps);
for (DictionaryEncodedColumn column : dictionaryColumnCache.values()) {
CloseQuietly.close(column);
}
for (GenericColumn column : genericColumnCache.values()) {
CloseQuietly.close(column);
}
for (ComplexColumn complexColumn : complexColumnCache.values()) {
CloseQuietly.close(complexColumn);
}
for (Object column : objectColumnCache.values()) {
if (column instanceof Closeable) {
CloseQuietly.close((Closeable) column);
}
}
}
}
); );
} }
} }

View File

@ -28,4 +28,7 @@ public interface ComplexColumn extends Closeable
public Class<?> getClazz(); public Class<?> getClazz();
public String getTypeName(); public String getTypeName();
public Object getRowValue(int rowNum); public Object getRowValue(int rowNum);
@Override
void close();
} }

View File

@ -1,57 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.column;
import io.druid.segment.data.Indexed;
/**
*/
public class ComplexColumnImpl extends AbstractColumn
{
private static final ColumnCapabilitiesImpl CAPABILITIES = new ColumnCapabilitiesImpl()
.setType(ValueType.COMPLEX);
private final Indexed column;
private final String typeName;
public ComplexColumnImpl(String typeName, Indexed column)
{
this.column = column;
this.typeName = typeName;
}
@Override
public ColumnCapabilities getCapabilities()
{
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public ComplexColumn getComplexColumn()
{
return new IndexedComplexColumn(typeName, column);
}
}

View File

@ -39,4 +39,7 @@ public interface GenericColumn extends Closeable
public IndexedFloats getFloatMultiValueRow(int rowNum); public IndexedFloats getFloatMultiValueRow(int rowNum);
public long getLongSingleValueRow(int rowNum); public long getLongSingleValueRow(int rowNum);
public IndexedLongs getLongMultiValueRow(int rowNum); public IndexedLongs getLongMultiValueRow(int rowNum);
@Override
void close();
} }

View File

@ -56,7 +56,7 @@ public class IndexedComplexColumn implements ComplexColumn
} }
@Override @Override
public void close() throws IOException public void close()
{ {
} }
} }

View File

@ -23,8 +23,6 @@ import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats; import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedLongs; import io.druid.segment.data.IndexedLongs;
import java.io.IOException;
/** /**
*/ */
public class IndexedFloatsGenericColumn implements GenericColumn public class IndexedFloatsGenericColumn implements GenericColumn
@ -92,7 +90,7 @@ public class IndexedFloatsGenericColumn implements GenericColumn
} }
@Override @Override
public void close() throws IOException public void close()
{ {
column.close(); column.close();
} }

View File

@ -23,8 +23,6 @@ import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats; import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedLongs; import io.druid.segment.data.IndexedLongs;
import java.io.IOException;
/** /**
*/ */
public class IndexedLongsGenericColumn implements GenericColumn public class IndexedLongsGenericColumn implements GenericColumn
@ -92,7 +90,7 @@ public class IndexedLongsGenericColumn implements GenericColumn
} }
@Override @Override
public void close() throws IOException public void close()
{ {
column.close(); column.close();
} }

View File

@ -20,7 +20,6 @@
package io.druid.segment.column; package io.druid.segment.column;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import io.druid.java.util.common.guava.CloseQuietly;
/** /**
*/ */
@ -62,14 +61,9 @@ class SimpleColumn implements Column
@Override @Override
public int getLength() public int getLength()
{ {
GenericColumn column = null; try (final GenericColumn column = genericColumn.get()) {
try {
column = genericColumn.get();
return column.length(); return column.length();
} }
finally {
CloseQuietly.close(column);
}
} }
@Override @Override

View File

@ -20,12 +20,10 @@
package io.druid.segment.data; package io.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.io.Closeables;
import com.google.common.primitives.Floats; import com.google.common.primitives.Floats;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.CloseQuietly;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.FloatBuffer; import java.nio.FloatBuffer;
@ -140,9 +138,11 @@ public class BlockLayoutIndexedFloatSupplier implements Supplier<IndexedFloats>
} }
@Override @Override
public void close() throws IOException public void close()
{ {
Closeables.close(holder, false); if (holder != null) {
holder.close();
}
} }
} }
} }

View File

@ -20,11 +20,9 @@
package io.druid.segment.data; package io.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.io.Closeables;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.CloseQuietly;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.LongBuffer; import java.nio.LongBuffer;
@ -175,9 +173,11 @@ public class BlockLayoutIndexedLongSupplier implements Supplier<IndexedLongs>
} }
@Override @Override
public void close() throws IOException public void close()
{ {
Closeables.close(holder, false); if (holder != null) {
holder.close();
}
} }
} }
} }

View File

@ -21,7 +21,6 @@ package io.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.FloatBuffer; import java.nio.FloatBuffer;
@ -82,7 +81,7 @@ public class EntireLayoutIndexedFloatSupplier implements Supplier<IndexedFloats>
} }
@Override @Override
public void close() throws IOException public void close()
{ {
} }
} }

View File

@ -21,8 +21,6 @@ package io.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import java.io.IOException;
public class EntireLayoutIndexedLongSupplier implements Supplier<IndexedLongs> public class EntireLayoutIndexedLongSupplier implements Supplier<IndexedLongs>
{ {
@ -80,7 +78,7 @@ public class EntireLayoutIndexedLongSupplier implements Supplier<IndexedLongs>
} }
@Override @Override
public void close() throws IOException public void close()
{ {
} }
} }

View File

@ -29,4 +29,7 @@ public interface IndexedFloats extends Closeable
public int size(); public int size();
public float get(int index); public float get(int index);
public void fill(int index, float[] toFill); public void fill(int index, float[] toFill);
@Override
void close();
} }

View File

@ -29,4 +29,7 @@ public interface IndexedLongs extends Closeable
public int size(); public int size();
public long get(int index); public long get(int index);
public void fill(int index, long[] toFill); public void fill(int index, long[] toFill);
@Override
void close();
} }