diff --git a/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java index 24f6750785f..356b85f6aa1 100644 --- a/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java +++ b/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java @@ -134,6 +134,13 @@ public class IndexIO return handler.mapDir(inDir); } + @Deprecated + public static void unmapDir(File inDir) throws IOException + { + init(); + handler.close(inDir); + } + public static QueryableIndex loadIndex(File inDir) throws IOException { init(); @@ -148,6 +155,20 @@ public class IndexIO } } + public static void close(File inDir) throws IOException + { + init(); + final int version = getVersionFromDir(inDir); + + final IndexLoader loader = indexLoaders.get(version); + + if (loader != null) { + loader.close(inDir); + } else { + throw new ISE("Unknown index version[%s]", version); + } + } + public static void storeLatest(Index index, File file) throws IOException { handler.storeLatest(index, file); @@ -261,6 +282,8 @@ public class IndexIO public MMappedIndex mapDir(File inDir) throws IOException; + public void close(File inDir) throws IOException; + /** * This only exists for some legacy compatibility reasons, Metamarkets is working on getting rid of it in * future versions. Normal persisting of indexes is done via IndexMerger. @@ -383,6 +406,14 @@ public class IndexIO return retVal; } + @Override + public void close(File inDir) throws IOException + { + if (canBeMapped(inDir)) { + Smoosh.close(inDir); + } + } + @Override public void storeLatest(Index index, File file) { @@ -680,6 +711,8 @@ public class IndexIO static interface IndexLoader { public QueryableIndex load(File inDir) throws IOException; + + public void close(File inDir) throws IOException; } static class LegacyIndexLoader implements IndexLoader @@ -764,6 +797,12 @@ public class IndexIO columns ); } + + @Override + public void close(File inDir) throws IOException + { + IndexIO.unmapDir(inDir); + } } static class V9IndexLoader implements IndexLoader @@ -803,6 +842,12 @@ public class IndexIO return index; } + @Override + public void close(File inDir) throws IOException + { + Smoosh.close(inDir); + } + private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer) throws IOException { ColumnDescriptor serde = mapper.readValue( diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java index 1ccf32e0795..dda7afe7d78 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java @@ -331,11 +331,13 @@ public class WorkerNode extends QueryableNode { if (serviceDiscovery == null) { final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class); - this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - getCuratorFramework(), + final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework( config, getLifecycle() ); + this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( + serviceDiscoveryCuratorFramework, config, getLifecycle() + ); } if (coordinatorServiceProvider == null) { this.coordinatorServiceProvider = Initialization.makeServiceProvider( diff --git a/pom.xml b/pom.xml index 01e9a2a36bc..17335349014 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ UTF-8 - 0.22.3 + 0.22.5-SNAPSHOT 2.1.0-incubating diff --git a/server/src/main/java/com/metamx/druid/loading/MMappedQueryableIndexFactory.java b/server/src/main/java/com/metamx/druid/loading/MMappedQueryableIndexFactory.java index d2cef9b76b3..842a62851bb 100644 --- a/server/src/main/java/com/metamx/druid/loading/MMappedQueryableIndexFactory.java +++ b/server/src/main/java/com/metamx/druid/loading/MMappedQueryableIndexFactory.java @@ -69,4 +69,10 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory throw new SegmentLoadingException(e, "%s", e.getMessage()); } } + + @Override + public void close(File parentDir) throws IOException + { + IndexIO.close(parentDir); + } } diff --git a/server/src/main/java/com/metamx/druid/loading/QueryableIndexFactory.java b/server/src/main/java/com/metamx/druid/loading/QueryableIndexFactory.java index 276bbc2028a..e4bed51cbd3 100644 --- a/server/src/main/java/com/metamx/druid/loading/QueryableIndexFactory.java +++ b/server/src/main/java/com/metamx/druid/loading/QueryableIndexFactory.java @@ -22,10 +22,13 @@ package com.metamx.druid.loading; import com.metamx.druid.index.QueryableIndex; import java.io.File; +import java.io.IOException; /** */ public interface QueryableIndexFactory { public QueryableIndex factorize(File parentDir) throws SegmentLoadingException; + + public void close(File parentDir) throws IOException; } diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java index f0e9a7f20e8..483b91bd4cd 100644 --- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java @@ -170,6 +170,8 @@ public class SingleSegmentLoader implements SegmentLoader log.info("Deleting directory[%s]", cacheFile); FileUtils.deleteDirectory(cacheFile); loc.removeSegment(segment); + + factory.close(cacheFile); } catch (IOException e) { throw new SegmentLoadingException(e, e.getMessage());