diff --git a/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java b/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java index 717b0b513a0..81dd8d3136f 100644 --- a/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java +++ b/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java @@ -22,7 +22,6 @@ package io.druid.segment.loading; import com.metamx.common.logger.Logger; import io.druid.segment.IndexIO; import io.druid.segment.QueryableIndex; -import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; @@ -40,13 +39,6 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory return IndexIO.loadIndex(parentDir); } catch (IOException e) { - log.warn(e, "Got exception!!!! Going to delete parentDir[%s]", parentDir); - try { - FileUtils.deleteDirectory(parentDir); - } - catch (IOException e2) { - log.error(e, "Problem deleting parentDir[%s]", parentDir); - } throw new SegmentLoadingException(e, "%s", e.getMessage()); } } diff --git a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java index bfec5093392..d67695d5376 100644 --- a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java @@ -48,6 +48,8 @@ public class OmniSegmentLoader implements SegmentLoader private final List locations; + private final Object lock = new Object(); + @Inject public OmniSegmentLoader( Map pullers, @@ -118,16 +120,35 @@ public class OmniSegmentLoader implements SegmentLoader } File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); - if (!storageDir.mkdirs()) { - log.debug("Unable to make parent file[%s]", storageDir); + + // We use a marker to prevent the case where a segment is downloaded, but before the download completes, + // the parent directories of the segment are removed + final File downloadStartMarker = new File(storageDir, "downloadStartMarker"); + synchronized (lock) { + if (!storageDir.mkdirs()) { + log.debug("Unable to make parent file[%s]", storageDir); + } + try { + downloadStartMarker.createNewFile(); + } + catch (IOException e) { + throw new SegmentLoadingException("Unable to create marker file for [%s]", storageDir); + } } getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir); + + try { + FileUtils.deleteDirectory(downloadStartMarker); + } + catch (Exception e) { + throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir); + } + loc.addSegment(segment); retVal = storageDir; - } - else { + } else { retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); } @@ -151,9 +172,10 @@ public class OmniSegmentLoader implements SegmentLoader } try { + // Druid creates folders of the form dataSource/interval/version/partitionNum. + // We need to clean up all these directories if they are all empty. File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); - log.info("Deleting directory[%s]", cacheFile); - FileUtils.deleteDirectory(cacheFile); + cleanupCacheFiles(loc.getPath(), cacheFile); loc.removeSegment(segment); } catch (IOException e) { @@ -172,4 +194,25 @@ public class OmniSegmentLoader implements SegmentLoader return loader; } + + public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException + { + if (cacheFile.equals(baseFile)) { + return; + } + + synchronized (lock) { + log.info("Deleting directory[%s]", cacheFile); + try { + FileUtils.deleteDirectory(cacheFile); + } + catch (Exception e) { + log.error("Unable to remove file[%s]", cacheFile); + } + } + + if (cacheFile.getParentFile() != null && cacheFile.getParentFile().listFiles().length == 0) { + cleanupCacheFiles(baseFile, cacheFile.getParentFile()); + } + } }