add the ability to close mmapped files

This commit is contained in:
fjy 2013-07-29 15:22:34 -07:00
parent b0090a1de6
commit 1226190279
6 changed files with 61 additions and 3 deletions

View File

@ -134,6 +134,13 @@ public class IndexIO
return handler.mapDir(inDir); return handler.mapDir(inDir);
} }
@Deprecated
public static void unmapDir(File inDir) throws IOException
{
init();
handler.close(inDir);
}
public static QueryableIndex loadIndex(File inDir) throws IOException public static QueryableIndex loadIndex(File inDir) throws IOException
{ {
init(); init();
@ -148,6 +155,20 @@ public class IndexIO
} }
} }
public static void close(File inDir) throws IOException
{
init();
final int version = getVersionFromDir(inDir);
final IndexLoader loader = indexLoaders.get(version);
if (loader != null) {
loader.close(inDir);
} else {
throw new ISE("Unknown index version[%s]", version);
}
}
public static void storeLatest(Index index, File file) throws IOException public static void storeLatest(Index index, File file) throws IOException
{ {
handler.storeLatest(index, file); handler.storeLatest(index, file);
@ -261,6 +282,8 @@ public class IndexIO
public MMappedIndex mapDir(File inDir) throws IOException; public MMappedIndex mapDir(File inDir) throws IOException;
public void close(File inDir) throws IOException;
/** /**
* This only exists for some legacy compatibility reasons, Metamarkets is working on getting rid of it in * This only exists for some legacy compatibility reasons, Metamarkets is working on getting rid of it in
* future versions. Normal persisting of indexes is done via IndexMerger. * future versions. Normal persisting of indexes is done via IndexMerger.
@ -383,6 +406,14 @@ public class IndexIO
return retVal; return retVal;
} }
@Override
public void close(File inDir) throws IOException
{
if (canBeMapped(inDir)) {
Smoosh.close(inDir);
}
}
@Override @Override
public void storeLatest(Index index, File file) public void storeLatest(Index index, File file)
{ {
@ -680,6 +711,8 @@ public class IndexIO
static interface IndexLoader static interface IndexLoader
{ {
public QueryableIndex load(File inDir) throws IOException; public QueryableIndex load(File inDir) throws IOException;
public void close(File inDir) throws IOException;
} }
static class LegacyIndexLoader implements IndexLoader static class LegacyIndexLoader implements IndexLoader
@ -764,6 +797,12 @@ public class IndexIO
columns columns
); );
} }
@Override
public void close(File inDir) throws IOException
{
IndexIO.unmapDir(inDir);
}
} }
static class V9IndexLoader implements IndexLoader static class V9IndexLoader implements IndexLoader
@ -803,6 +842,12 @@ public class IndexIO
return index; return index;
} }
@Override
public void close(File inDir) throws IOException
{
Smoosh.close(inDir);
}
private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer) throws IOException private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer) throws IOException
{ {
ColumnDescriptor serde = mapper.readValue( ColumnDescriptor serde = mapper.readValue(

View File

@ -331,11 +331,13 @@ public class WorkerNode extends QueryableNode<WorkerNode>
{ {
if (serviceDiscovery == null) { if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(
getCuratorFramework(),
config, config,
getLifecycle() getLifecycle()
); );
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
serviceDiscoveryCuratorFramework, config, getLifecycle()
);
} }
if (coordinatorServiceProvider == null) { if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider( this.coordinatorServiceProvider = Initialization.makeServiceProvider(

View File

@ -38,7 +38,7 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.3</metamx.java-util.version> <metamx.java-util.version>0.22.5-SNAPSHOT</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version> <apache.curator.version>2.1.0-incubating</apache.curator.version>
</properties> </properties>

View File

@ -69,4 +69,10 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
throw new SegmentLoadingException(e, "%s", e.getMessage()); throw new SegmentLoadingException(e, "%s", e.getMessage());
} }
} }
@Override
public void close(File parentDir) throws IOException
{
IndexIO.close(parentDir);
}
} }

View File

@ -22,10 +22,13 @@ package com.metamx.druid.loading;
import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.QueryableIndex;
import java.io.File; import java.io.File;
import java.io.IOException;
/** /**
*/ */
public interface QueryableIndexFactory public interface QueryableIndexFactory
{ {
public QueryableIndex factorize(File parentDir) throws SegmentLoadingException; public QueryableIndex factorize(File parentDir) throws SegmentLoadingException;
public void close(File parentDir) throws IOException;
} }

View File

@ -170,6 +170,8 @@ public class SingleSegmentLoader implements SegmentLoader
log.info("Deleting directory[%s]", cacheFile); log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile); FileUtils.deleteDirectory(cacheFile);
loc.removeSegment(segment); loc.removeSegment(segment);
factory.close(cacheFile);
} }
catch (IOException e) { catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage()); throw new SegmentLoadingException(e, e.getMessage());