fix according to code review

This commit is contained in:
fjy 2013-07-29 18:39:14 -07:00
parent 1226190279
commit f55e12040f
8 changed files with 47 additions and 63 deletions

View File

@ -23,9 +23,11 @@ import com.metamx.druid.index.column.ColumnSelector;
import com.metamx.druid.kv.Indexed; import com.metamx.druid.kv.Indexed;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.Closeable;
/** /**
*/ */
public interface QueryableIndex extends ColumnSelector public interface QueryableIndex extends ColumnSelector, Closeable
{ {
public Interval getDataInterval(); public Interval getDataInterval();
public int getNumRows(); public int getNumRows();

View File

@ -19,10 +19,12 @@
package com.metamx.druid.index; package com.metamx.druid.index;
import com.metamx.common.io.smoosh.SmooshedFileMapper;
import com.metamx.druid.index.column.Column; import com.metamx.druid.index.column.Column;
import com.metamx.druid.kv.Indexed; import com.metamx.druid.kv.Indexed;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException;
import java.util.Map; import java.util.Map;
/** /**
@ -34,13 +36,15 @@ public class SimpleQueryableIndex implements QueryableIndex
private final Indexed<String> availableDimensions; private final Indexed<String> availableDimensions;
private final Column timeColumn; private final Column timeColumn;
private final Map<String, Column> otherColumns; private final Map<String, Column> otherColumns;
private final SmooshedFileMapper fileMapper;
public SimpleQueryableIndex( public SimpleQueryableIndex(
Interval dataInterval, Interval dataInterval,
Indexed<String> columnNames, Indexed<String> columnNames,
Indexed<String> dimNames, Indexed<String> dimNames,
Column timeColumn, Column timeColumn,
Map<String, Column> otherColumns Map<String, Column> otherColumns,
SmooshedFileMapper fileMapper
) )
{ {
this.dataInterval = dataInterval; this.dataInterval = dataInterval;
@ -48,6 +52,7 @@ public class SimpleQueryableIndex implements QueryableIndex
this.availableDimensions = dimNames; this.availableDimensions = dimNames;
this.timeColumn = timeColumn; this.timeColumn = timeColumn;
this.otherColumns = otherColumns; this.otherColumns = otherColumns;
this.fileMapper = fileMapper;
} }
@Override @Override
@ -85,4 +90,10 @@ public class SimpleQueryableIndex implements QueryableIndex
{ {
return otherColumns.get(columnName); return otherColumns.get(columnName);
} }
@Override
public void close() throws IOException
{
fileMapper.close();
}
} }

View File

@ -134,13 +134,6 @@ public class IndexIO
return handler.mapDir(inDir); return handler.mapDir(inDir);
} }
@Deprecated
public static void unmapDir(File inDir) throws IOException
{
init();
handler.close(inDir);
}
public static QueryableIndex loadIndex(File inDir) throws IOException public static QueryableIndex loadIndex(File inDir) throws IOException
{ {
init(); init();
@ -155,20 +148,6 @@ public class IndexIO
} }
} }
public static void close(File inDir) throws IOException
{
init();
final int version = getVersionFromDir(inDir);
final IndexLoader loader = indexLoaders.get(version);
if (loader != null) {
loader.close(inDir);
} else {
throw new ISE("Unknown index version[%s]", version);
}
}
public static void storeLatest(Index index, File file) throws IOException public static void storeLatest(Index index, File file) throws IOException
{ {
handler.storeLatest(index, file); handler.storeLatest(index, file);
@ -282,8 +261,6 @@ public class IndexIO
public MMappedIndex mapDir(File inDir) throws IOException; public MMappedIndex mapDir(File inDir) throws IOException;
public void close(File inDir) throws IOException;
/** /**
* This only exists for some legacy compatibility reasons, Metamarkets is working on getting rid of it in * This only exists for some legacy compatibility reasons, Metamarkets is working on getting rid of it in
* future versions. Normal persisting of indexes is done via IndexMerger. * future versions. Normal persisting of indexes is done via IndexMerger.
@ -398,7 +375,8 @@ public class IndexIO
dimValueLookups, dimValueLookups,
dimColumns, dimColumns,
invertedIndexed, invertedIndexed,
spatialIndexed spatialIndexed,
smooshedFiles
); );
log.debug("Mapped v8 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); log.debug("Mapped v8 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
@ -406,14 +384,6 @@ public class IndexIO
return retVal; return retVal;
} }
@Override
public void close(File inDir) throws IOException
{
if (canBeMapped(inDir)) {
Smoosh.close(inDir);
}
}
@Override @Override
public void storeLatest(Index index, File file) public void storeLatest(Index index, File file)
{ {
@ -711,8 +681,6 @@ public class IndexIO
static interface IndexLoader static interface IndexLoader
{ {
public QueryableIndex load(File inDir) throws IOException; public QueryableIndex load(File inDir) throws IOException;
public void close(File inDir) throws IOException;
} }
static class LegacyIndexLoader implements IndexLoader static class LegacyIndexLoader implements IndexLoader
@ -794,15 +762,10 @@ public class IndexIO
.setType(ValueType.LONG) .setType(ValueType.LONG)
.setGenericColumn(new LongGenericColumnSupplier(index.timestamps)) .setGenericColumn(new LongGenericColumnSupplier(index.timestamps))
.build(), .build(),
columns columns,
index.getFileMapper()
); );
} }
@Override
public void close(File inDir) throws IOException
{
IndexIO.unmapDir(inDir);
}
} }
static class V9IndexLoader implements IndexLoader static class V9IndexLoader implements IndexLoader
@ -834,7 +797,7 @@ public class IndexIO
} }
final QueryableIndex index = new SimpleQueryableIndex( final QueryableIndex index = new SimpleQueryableIndex(
dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time")), columns dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time")), columns, smooshedFiles
); );
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
@ -842,12 +805,6 @@ public class IndexIO
return index; return index;
} }
@Override
public void close(File inDir) throws IOException
{
Smoosh.close(inDir);
}
private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer) throws IOException private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer) throws IOException
{ {
ColumnDescriptor serde = mapper.readValue( ColumnDescriptor serde = mapper.readValue(

View File

@ -24,18 +24,19 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.io.smoosh.SmooshedFileMapper;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.kv.ConciseCompressedIndexedInts; import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed; import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed; import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedList; import com.metamx.druid.kv.IndexedList;
import com.metamx.druid.kv.IndexedLongs; import com.metamx.druid.kv.IndexedLongs;
import com.metamx.druid.kv.IndexedRTree;
import com.metamx.druid.kv.VSizeIndexed; import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts; import com.metamx.druid.kv.VSizeIndexedInts;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.LongBuffer; import java.nio.LongBuffer;
import java.util.Arrays; import java.util.Arrays;
@ -57,6 +58,7 @@ public class MMappedIndex
final Map<String, VSizeIndexed> dimColumns; final Map<String, VSizeIndexed> dimColumns;
final Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes; final Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes;
final Map<String, ImmutableRTree> spatialIndexes; final Map<String, ImmutableRTree> spatialIndexes;
final SmooshedFileMapper fileMapper;
private final Map<String, Integer> metricIndexes = Maps.newHashMap(); private final Map<String, Integer> metricIndexes = Maps.newHashMap();
@ -69,7 +71,8 @@ public class MMappedIndex
Map<String, GenericIndexed<String>> dimValueLookups, Map<String, GenericIndexed<String>> dimValueLookups,
Map<String, VSizeIndexed> dimColumns, Map<String, VSizeIndexed> dimColumns,
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes, Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes,
Map<String, ImmutableRTree> spatialIndexes Map<String, ImmutableRTree> spatialIndexes,
SmooshedFileMapper fileMapper
) )
{ {
this.availableDimensions = availableDimensions; this.availableDimensions = availableDimensions;
@ -81,6 +84,7 @@ public class MMappedIndex
this.dimColumns = dimColumns; this.dimColumns = dimColumns;
this.invertedIndexes = invertedIndexes; this.invertedIndexes = invertedIndexes;
this.spatialIndexes = spatialIndexes; this.spatialIndexes = spatialIndexes;
this.fileMapper = fileMapper;
for (int i = 0; i < availableMetrics.size(); i++) { for (int i = 0; i < availableMetrics.size(); i++) {
metricIndexes.put(availableMetrics.get(i), i); metricIndexes.put(availableMetrics.get(i), i);
@ -169,6 +173,18 @@ public class MMappedIndex
return (retVal == null) ? emptySet : retVal; return (retVal == null) ? emptySet : retVal;
} }
public SmooshedFileMapper getFileMapper()
{
return fileMapper;
}
public void close() throws IOException
{
if (fileMapper != null) {
fileMapper.close();
}
}
public static MMappedIndex fromIndex(Index index) public static MMappedIndex fromIndex(Index index)
{ {
log.info("Converting timestamps"); log.info("Converting timestamps");
@ -273,7 +289,8 @@ public class MMappedIndex
dimValueLookups, dimValueLookups,
dimColumns, dimColumns,
invertedIndexes, invertedIndexes,
spatialIndexes spatialIndexes,
null
); );
} }
} }

View File

@ -183,6 +183,13 @@ public class ServerManager implements QuerySegmentWalker
synchronized (dataSourceCounts) { synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, -1L); dataSourceCounts.add(dataSource, -1L);
} }
try {
oldQueryable.asQueryableIndex().close();
}
catch (Exception e) {
log.error("Unable to close queryable index %s", oldQueryable.getIdentifier());
}
} else { } else {
log.info( log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.", "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",

View File

@ -69,10 +69,4 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
throw new SegmentLoadingException(e, "%s", e.getMessage()); throw new SegmentLoadingException(e, "%s", e.getMessage());
} }
} }
@Override
public void close(File parentDir) throws IOException
{
IndexIO.close(parentDir);
}
} }

View File

@ -29,6 +29,4 @@ import java.io.IOException;
public interface QueryableIndexFactory public interface QueryableIndexFactory
{ {
public QueryableIndex factorize(File parentDir) throws SegmentLoadingException; public QueryableIndex factorize(File parentDir) throws SegmentLoadingException;
public void close(File parentDir) throws IOException;
} }

View File

@ -170,8 +170,6 @@ public class SingleSegmentLoader implements SegmentLoader
log.info("Deleting directory[%s]", cacheFile); log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile); FileUtils.deleteDirectory(cacheFile);
loc.removeSegment(segment); loc.removeSegment(segment);
factory.close(cacheFile);
} }
catch (IOException e) { catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage()); throw new SegmentLoadingException(e, e.getMessage());