mirror of https://github.com/apache/druid.git
fix leak with files not cleaning up correctly
This commit is contained in:
parent
10b7ca9fa9
commit
3ef21bfc66
|
@ -22,7 +22,6 @@ package io.druid.segment.loading;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import io.druid.segment.IndexIO;
|
import io.druid.segment.IndexIO;
|
||||||
import io.druid.segment.QueryableIndex;
|
import io.druid.segment.QueryableIndex;
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -40,13 +39,6 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
|
||||||
return IndexIO.loadIndex(parentDir);
|
return IndexIO.loadIndex(parentDir);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
log.warn(e, "Got exception!!!! Going to delete parentDir[%s]", parentDir);
|
|
||||||
try {
|
|
||||||
FileUtils.deleteDirectory(parentDir);
|
|
||||||
}
|
|
||||||
catch (IOException e2) {
|
|
||||||
log.error(e, "Problem deleting parentDir[%s]", parentDir);
|
|
||||||
}
|
|
||||||
throw new SegmentLoadingException(e, "%s", e.getMessage());
|
throw new SegmentLoadingException(e, "%s", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,8 @@ public class OmniSegmentLoader implements SegmentLoader
|
||||||
|
|
||||||
private final List<StorageLocation> locations;
|
private final List<StorageLocation> locations;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public OmniSegmentLoader(
|
public OmniSegmentLoader(
|
||||||
Map<String, DataSegmentPuller> pullers,
|
Map<String, DataSegmentPuller> pullers,
|
||||||
|
@ -118,16 +120,35 @@ public class OmniSegmentLoader implements SegmentLoader
|
||||||
}
|
}
|
||||||
|
|
||||||
File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||||
|
|
||||||
|
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
|
||||||
|
// the parent directories of the segment are removed
|
||||||
|
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
|
||||||
|
synchronized (lock) {
|
||||||
if (!storageDir.mkdirs()) {
|
if (!storageDir.mkdirs()) {
|
||||||
log.debug("Unable to make parent file[%s]", storageDir);
|
log.debug("Unable to make parent file[%s]", storageDir);
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
downloadStartMarker.createNewFile();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new SegmentLoadingException("Unable to create marker file for [%s]", storageDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir);
|
getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir);
|
||||||
|
|
||||||
|
try {
|
||||||
|
FileUtils.deleteDirectory(downloadStartMarker);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
|
||||||
|
}
|
||||||
|
|
||||||
loc.addSegment(segment);
|
loc.addSegment(segment);
|
||||||
|
|
||||||
retVal = storageDir;
|
retVal = storageDir;
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,9 +172,10 @@ public class OmniSegmentLoader implements SegmentLoader
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Druid creates folders of the form dataSource/interval/version/partitionNum.
|
||||||
|
// We need to clean up all these directories if they are all empty.
|
||||||
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||||
log.info("Deleting directory[%s]", cacheFile);
|
cleanupCacheFiles(loc.getPath(), cacheFile);
|
||||||
FileUtils.deleteDirectory(cacheFile);
|
|
||||||
loc.removeSegment(segment);
|
loc.removeSegment(segment);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
|
@ -172,4 +194,25 @@ public class OmniSegmentLoader implements SegmentLoader
|
||||||
|
|
||||||
return loader;
|
return loader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException
|
||||||
|
{
|
||||||
|
if (cacheFile.equals(baseFile)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized (lock) {
|
||||||
|
log.info("Deleting directory[%s]", cacheFile);
|
||||||
|
try {
|
||||||
|
FileUtils.deleteDirectory(cacheFile);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("Unable to remove file[%s]", cacheFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cacheFile.getParentFile() != null && cacheFile.getParentFile().listFiles().length == 0) {
|
||||||
|
cleanupCacheFiles(baseFile, cacheFile.getParentFile());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue