diff --git a/server/pom.xml b/server/pom.xml
index 65f6b5b855d..1424bb6d375 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -254,6 +254,12 @@
1.0.4
test
+
+ com.google.jimfs
+ jimfs
+ 1.1
+ test
+
diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index b739a842fb5..d2877638afe 100644
--- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -28,10 +28,14 @@ import io.druid.java.util.emitter.EmittingLogger;
import io.druid.segment.IndexIO;
import io.druid.segment.Segment;
import io.druid.timeline.DataSegment;
-import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -53,7 +57,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
private static final Comparator COMPARATOR = new Comparator()
{
- @Override public int compare(StorageLocation left, StorageLocation right)
+ @Override
+ public int compare(StorageLocation left, StorageLocation right)
{
return Longs.compare(right.available(), left.available());
}
@@ -73,7 +78,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
this.locations = Lists.newArrayList();
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(new StorageLocation(
- locationConfig.getPath(),
+ locationConfig.getPath().toPath(),
locationConfig.getMaxSize(),
locationConfig.getFreeSpacePercent()
));
@@ -94,8 +99,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : getSortedList(locations)) {
- File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (localStorageDir.exists()) {
+ Path localStorageDir = location.getPath().resolve(DataSegmentPusher.getDefaultStorageDir(segment, false));
+ if (Files.exists(localStorageDir)) {
return location;
}
}
@@ -133,7 +138,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
loc = loadSegmentWithRetry(segment, storageDir);
}
loc.addSegment(segment);
- return new File(loc.getPath(), storageDir);
+ return loc.getPath().resolve(storageDir).toFile();
}
/**
@@ -145,7 +150,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
for (StorageLocation loc : getSortedList(locations)) {
if (loc.canHandle(segment)) {
- File storageDir = new File(loc.getPath(), storageDirStr);
+ Path storageDir = loc.getPath().resolve(storageDirStr);
try {
loadInLocationWithStartMarker(segment, storageDir);
@@ -155,9 +160,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
log.makeAlert(
e,
"Failed to load segment in current location %s, try next location if any",
- loc.getPath().getAbsolutePath()
+ loc.getPath().toAbsolutePath()
)
- .addData("location", loc.getPath().getAbsolutePath())
+ .addData("location", loc.getPath().toAbsolutePath())
.emit();
cleanupCacheFiles(loc.getPath(), storageDir);
@@ -167,19 +172,20 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getIdentifier());
}
- private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException
+ private void loadInLocationWithStartMarker(DataSegment segment, Path storageDir) throws SegmentLoadingException
{
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
- final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
+ final Path downloadStartMarker = storageDir.resolve("downloadStartMarker");
synchronized (lock) {
- if (!storageDir.mkdirs()) {
- log.debug("Unable to make parent file[%s]", storageDir);
+ try {
+ Files.createDirectories(storageDir);
+ }
+ catch (IOException e) {
+ log.warn(e, "Unable to make parent file[%s]", storageDir);
}
try {
- if (!downloadStartMarker.createNewFile()) {
- throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
- }
+ Files.createFile(downloadStartMarker);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
@@ -187,17 +193,20 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
loadInLocation(segment, storageDir);
- if (!downloadStartMarker.delete()) {
+ try {
+ Files.delete(downloadStartMarker);
+ }
+ catch (IOException e) {
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
}
}
- private void loadInLocation(DataSegment segment, File storageDir) throws SegmentLoadingException
+ private void loadInLocation(DataSegment segment, Path storageDir) throws SegmentLoadingException
{
// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the
// LoadSpec dependencies.
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
- final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
+ final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir.toFile());
if (result.getSize() != segment.getSize()) {
log.warn(
"Segment [%s] is different than expected size. Expected [%d] found [%d]",
@@ -226,8 +235,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : getSortedList(locations)) {
- File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (localStorageDir.exists()) {
+ Path localStorageDir = location.getPath().resolve(DataSegmentPusher.getDefaultStorageDir(segment, false));
+ if (Files.exists(localStorageDir)) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
cleanupCacheFiles(location.getPath(), localStorageDir);
@@ -236,7 +245,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
}
- private void cleanupCacheFiles(File baseFile, File cacheFile)
+ private void cleanupCacheFiles(Path baseFile, Path cacheFile)
{
if (cacheFile.equals(baseFile)) {
return;
@@ -245,17 +254,38 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
synchronized (lock) {
log.info("Deleting directory[%s]", cacheFile);
try {
- FileUtils.deleteDirectory(cacheFile);
+ Files.walkFileTree(cacheFile, new SimpleFileVisitor()
+ {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException
+ {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException
+ {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
}
- catch (Exception e) {
- log.error("Unable to remove file[%s]", cacheFile);
+ catch (IOException e) {
+ log.error("Unable to remove directory[%s]", cacheFile);
}
}
- File parent = cacheFile.getParentFile();
+ Path parent = cacheFile.getParent();
if (parent != null) {
- File[] children = parent.listFiles();
- if (children == null || children.length == 0) {
+ long children;
+ try {
+ children = Files.list(parent).count();
+ }
+ catch (IOException e) {
+ children = 0;
+ }
+ if (children == 0) {
cleanupCacheFiles(baseFile, parent);
}
}
diff --git a/server/src/main/java/io/druid/segment/loading/StorageLocation.java b/server/src/main/java/io/druid/segment/loading/StorageLocation.java
index 12fcf1d93c0..318d2855d53 100644
--- a/server/src/main/java/io/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/io/druid/segment/loading/StorageLocation.java
@@ -24,29 +24,31 @@ import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
-import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Set;
/**
-*/
+ */
class StorageLocation
{
private static final Logger log = new Logger(StorageLocation.class);
- private final File path;
+ private final Path path;
private final long maxSize;
private final long freeSpaceToKeep;
private final Set segments;
private volatile long currSize = 0;
- StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
+ StorageLocation(Path path, long maxSize, @Nullable Double freeSpacePercent)
{
this.path = path;
this.maxSize = maxSize;
if (freeSpacePercent != null) {
- long totalSpaceInPartition = path.getTotalSpace();
+ long totalSpaceInPartition = getTotalSpaceInPartition();
this.freeSpaceToKeep = (long) ((freeSpacePercent * totalSpaceInPartition) / 100);
log.info(
"SegmentLocation[%s] will try and maintain [%d:%d] free space while loading segments.",
@@ -61,7 +63,7 @@ class StorageLocation
this.segments = Sets.newHashSet();
}
- File getPath()
+ Path getPath()
{
return path;
}
@@ -90,19 +92,19 @@ class StorageLocation
if (available() < segment.getSize()) {
log.warn(
"Segment[%s:%,d] too lage for storage[%s:%,d].",
- segment.getIdentifier(), segment.getSize(), getPath(), available()
+ segment.getIdentifier(), segment.getSize(), path, available()
);
return false;
}
if (freeSpaceToKeep > 0) {
- long currFreeSpace = path.getFreeSpace();
+ long currFreeSpace = getCurrFreeSpace();
if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) {
log.warn(
"Segment[%s:%,d] too large for storage[%s:%,d] to maintain suggested freeSpace[%d], current freeSpace is [%d].",
segment.getIdentifier(),
segment.getSize(),
- getPath(),
+ path,
available(),
freeSpaceToKeep,
currFreeSpace
@@ -118,4 +120,28 @@ class StorageLocation
{
return maxSize - currSize;
}
+
+ private long getCurrFreeSpace()
+ {
+ long currFreeSpace = 0;
+ try {
+ currFreeSpace = Files.getFileStore(path).getUsableSpace();
+ }
+ catch (IOException e) {
+ log.warn(e, "Unable to get free space of SegmentLocation[%s].", path);
+ }
+ return currFreeSpace;
+ }
+
+ private long getTotalSpaceInPartition()
+ {
+ long totalSpaceInPartition = 0;
+ try {
+ totalSpaceInPartition = Files.getFileStore(path).getTotalSpace();
+ }
+ catch (IOException e) {
+ log.warn(e, "Unable to get total size of SegmentLocation[%s].", path);
+ }
+ return totalSpaceInPartition;
+ }
}
diff --git a/server/src/test/java/io/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/io/druid/segment/loading/StorageLocationTest.java
index 31c200dc24d..f8b1a105f62 100644
--- a/server/src/test/java/io/druid/segment/loading/StorageLocationTest.java
+++ b/server/src/test/java/io/druid/segment/loading/StorageLocationTest.java
@@ -20,13 +20,18 @@
package io.druid.segment.loading;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import com.google.common.jimfs.PathType;
import io.druid.java.util.common.Intervals;
import io.druid.timeline.DataSegment;
-import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Arrays;
/**
@@ -34,38 +39,47 @@ import java.util.Arrays;
public class StorageLocationTest
{
@Test
- public void testStorageLocationFreePercent()
+ public void testStorageLocationFreePercent() throws IOException
{
// free space ignored only maxSize matters
StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null);
Assert.assertTrue(locationPlain.canHandle(makeSegment("2012/2013", 9_000)));
Assert.assertFalse(locationPlain.canHandle(makeSegment("2012/2013", 11_000)));
+ locationPlain.getPath().getFileSystem().close();
// enough space available maxSize is the limit
StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0);
Assert.assertTrue(locationFree.canHandle(makeSegment("2012/2013", 9_000)));
Assert.assertFalse(locationFree.canHandle(makeSegment("2012/2013", 11_000)));
+ locationFree.getPath().getFileSystem().close();
// disk almost full percentage is the limit
StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0);
Assert.assertTrue(locationFull.canHandle(makeSegment("2012/2013", 4_000)));
Assert.assertFalse(locationFull.canHandle(makeSegment("2012/2013", 6_000)));
+ locationFull.getPath().getFileSystem().close();
}
- private StorageLocation fakeLocation(long total, long free, long max, Double percent)
+ private StorageLocation fakeLocation(long total, long free, long max, Double percent) throws IOException
{
- File file = EasyMock.mock(File.class);
- EasyMock.expect(file.getTotalSpace()).andReturn(total).anyTimes();
- EasyMock.expect(file.getFreeSpace()).andReturn(free).anyTimes();
- EasyMock.replay(file);
- return new StorageLocation(file, max, percent);
+ Configuration config = Configuration.builder(PathType.unix())
+ .setRoots("/")
+ .setWorkingDirectory("/")
+ .setAttributeViews("basic")
+ .setMaxSize(total)
+ .setBlockSize(1000)
+ .build();
+ Path path = Jimfs.newFileSystem(config).getPath("/");
+ Files.write(path.resolve("junk"), new byte[(int) (total - free)]); //TODO is there a better way?
+ Assert.assertEquals(free, Files.getFileStore(path).getUsableSpace());
+ return new StorageLocation(path, max, percent);
}
@Test
public void testStorageLocation()
{
long expectedAvail = 1000L;
- StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null);
+ StorageLocation loc = new StorageLocation(new File("/tmp").toPath(), expectedAvail, null);
verifyLoc(expectedAvail, loc);