mirror of https://github.com/apache/druid.git
switch to Path and test with jimfs
Path allows to use different filesystems. Jimfs provides an actual (in memory) filesystem. This also allows more complex test scenarios. The behavior should be unchanged by this commit.
This commit is contained in:
parent
2832f012d4
commit
8b9a418d65
|
@ -254,6 +254,12 @@
|
|||
<version>1.0.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.jimfs</groupId>
|
||||
<artifactId>jimfs</artifactId>
|
||||
<version>1.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -28,10 +28,14 @@ import io.druid.java.util.emitter.EmittingLogger;
|
|||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -53,7 +57,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
|
||||
private static final Comparator<StorageLocation> COMPARATOR = new Comparator<StorageLocation>()
|
||||
{
|
||||
@Override public int compare(StorageLocation left, StorageLocation right)
|
||||
@Override
|
||||
public int compare(StorageLocation left, StorageLocation right)
|
||||
{
|
||||
return Longs.compare(right.available(), left.available());
|
||||
}
|
||||
|
@ -73,7 +78,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
this.locations = Lists.newArrayList();
|
||||
for (StorageLocationConfig locationConfig : config.getLocations()) {
|
||||
locations.add(new StorageLocation(
|
||||
locationConfig.getPath(),
|
||||
locationConfig.getPath().toPath(),
|
||||
locationConfig.getMaxSize(),
|
||||
locationConfig.getFreeSpacePercent()
|
||||
));
|
||||
|
@ -94,8 +99,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
|
||||
{
|
||||
for (StorageLocation location : getSortedList(locations)) {
|
||||
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
|
||||
if (localStorageDir.exists()) {
|
||||
Path localStorageDir = location.getPath().resolve(DataSegmentPusher.getDefaultStorageDir(segment, false));
|
||||
if (Files.exists(localStorageDir)) {
|
||||
return location;
|
||||
}
|
||||
}
|
||||
|
@ -133,7 +138,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
loc = loadSegmentWithRetry(segment, storageDir);
|
||||
}
|
||||
loc.addSegment(segment);
|
||||
return new File(loc.getPath(), storageDir);
|
||||
return loc.getPath().resolve(storageDir).toFile();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -145,7 +150,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
{
|
||||
for (StorageLocation loc : getSortedList(locations)) {
|
||||
if (loc.canHandle(segment)) {
|
||||
File storageDir = new File(loc.getPath(), storageDirStr);
|
||||
Path storageDir = loc.getPath().resolve(storageDirStr);
|
||||
|
||||
try {
|
||||
loadInLocationWithStartMarker(segment, storageDir);
|
||||
|
@ -155,9 +160,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
log.makeAlert(
|
||||
e,
|
||||
"Failed to load segment in current location %s, try next location if any",
|
||||
loc.getPath().getAbsolutePath()
|
||||
loc.getPath().toAbsolutePath()
|
||||
)
|
||||
.addData("location", loc.getPath().getAbsolutePath())
|
||||
.addData("location", loc.getPath().toAbsolutePath())
|
||||
.emit();
|
||||
|
||||
cleanupCacheFiles(loc.getPath(), storageDir);
|
||||
|
@ -167,19 +172,20 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getIdentifier());
|
||||
}
|
||||
|
||||
private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException
|
||||
private void loadInLocationWithStartMarker(DataSegment segment, Path storageDir) throws SegmentLoadingException
|
||||
{
|
||||
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
|
||||
// the parent directories of the segment are removed
|
||||
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
|
||||
final Path downloadStartMarker = storageDir.resolve("downloadStartMarker");
|
||||
synchronized (lock) {
|
||||
if (!storageDir.mkdirs()) {
|
||||
log.debug("Unable to make parent file[%s]", storageDir);
|
||||
try {
|
||||
Files.createDirectories(storageDir);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Unable to make parent file[%s]", storageDir);
|
||||
}
|
||||
try {
|
||||
if (!downloadStartMarker.createNewFile()) {
|
||||
throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
|
||||
}
|
||||
Files.createFile(downloadStartMarker);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
|
||||
|
@ -187,17 +193,20 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
}
|
||||
loadInLocation(segment, storageDir);
|
||||
|
||||
if (!downloadStartMarker.delete()) {
|
||||
try {
|
||||
Files.delete(downloadStartMarker);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
|
||||
}
|
||||
}
|
||||
|
||||
private void loadInLocation(DataSegment segment, File storageDir) throws SegmentLoadingException
|
||||
private void loadInLocation(DataSegment segment, Path storageDir) throws SegmentLoadingException
|
||||
{
|
||||
// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the
|
||||
// LoadSpec dependencies.
|
||||
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
|
||||
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
|
||||
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir.toFile());
|
||||
if (result.getSize() != segment.getSize()) {
|
||||
log.warn(
|
||||
"Segment [%s] is different than expected size. Expected [%d] found [%d]",
|
||||
|
@ -226,8 +235,8 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
|
||||
// So we should always clean all possible locations here
|
||||
for (StorageLocation location : getSortedList(locations)) {
|
||||
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
|
||||
if (localStorageDir.exists()) {
|
||||
Path localStorageDir = location.getPath().resolve(DataSegmentPusher.getDefaultStorageDir(segment, false));
|
||||
if (Files.exists(localStorageDir)) {
|
||||
// Druid creates folders of the form dataSource/interval/version/partitionNum.
|
||||
// We need to clean up all these directories if they are all empty.
|
||||
cleanupCacheFiles(location.getPath(), localStorageDir);
|
||||
|
@ -236,7 +245,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
}
|
||||
}
|
||||
|
||||
private void cleanupCacheFiles(File baseFile, File cacheFile)
|
||||
private void cleanupCacheFiles(Path baseFile, Path cacheFile)
|
||||
{
|
||||
if (cacheFile.equals(baseFile)) {
|
||||
return;
|
||||
|
@ -245,17 +254,38 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
synchronized (lock) {
|
||||
log.info("Deleting directory[%s]", cacheFile);
|
||||
try {
|
||||
FileUtils.deleteDirectory(cacheFile);
|
||||
Files.walkFileTree(cacheFile, new SimpleFileVisitor<Path>()
|
||||
{
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException
|
||||
{
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException
|
||||
{
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Unable to remove file[%s]", cacheFile);
|
||||
catch (IOException e) {
|
||||
log.error("Unable to remove directory[%s]", cacheFile);
|
||||
}
|
||||
}
|
||||
|
||||
File parent = cacheFile.getParentFile();
|
||||
Path parent = cacheFile.getParent();
|
||||
if (parent != null) {
|
||||
File[] children = parent.listFiles();
|
||||
if (children == null || children.length == 0) {
|
||||
long children;
|
||||
try {
|
||||
children = Files.list(parent).count();
|
||||
}
|
||||
catch (IOException e) {
|
||||
children = 0;
|
||||
}
|
||||
if (children == 0) {
|
||||
cleanupCacheFiles(baseFile, parent);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,29 +24,31 @@ import io.druid.java.util.common.logger.Logger;
|
|||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
*/
|
||||
class StorageLocation
|
||||
{
|
||||
private static final Logger log = new Logger(StorageLocation.class);
|
||||
|
||||
private final File path;
|
||||
private final Path path;
|
||||
private final long maxSize;
|
||||
private final long freeSpaceToKeep;
|
||||
private final Set<DataSegment> segments;
|
||||
|
||||
private volatile long currSize = 0;
|
||||
|
||||
StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
|
||||
StorageLocation(Path path, long maxSize, @Nullable Double freeSpacePercent)
|
||||
{
|
||||
this.path = path;
|
||||
this.maxSize = maxSize;
|
||||
|
||||
if (freeSpacePercent != null) {
|
||||
long totalSpaceInPartition = path.getTotalSpace();
|
||||
long totalSpaceInPartition = getTotalSpaceInPartition();
|
||||
this.freeSpaceToKeep = (long) ((freeSpacePercent * totalSpaceInPartition) / 100);
|
||||
log.info(
|
||||
"SegmentLocation[%s] will try and maintain [%d:%d] free space while loading segments.",
|
||||
|
@ -61,7 +63,7 @@ class StorageLocation
|
|||
this.segments = Sets.newHashSet();
|
||||
}
|
||||
|
||||
File getPath()
|
||||
Path getPath()
|
||||
{
|
||||
return path;
|
||||
}
|
||||
|
@ -90,19 +92,19 @@ class StorageLocation
|
|||
if (available() < segment.getSize()) {
|
||||
log.warn(
|
||||
"Segment[%s:%,d] too lage for storage[%s:%,d].",
|
||||
segment.getIdentifier(), segment.getSize(), getPath(), available()
|
||||
segment.getIdentifier(), segment.getSize(), path, available()
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (freeSpaceToKeep > 0) {
|
||||
long currFreeSpace = path.getFreeSpace();
|
||||
long currFreeSpace = getCurrFreeSpace();
|
||||
if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) {
|
||||
log.warn(
|
||||
"Segment[%s:%,d] too large for storage[%s:%,d] to maintain suggested freeSpace[%d], current freeSpace is [%d].",
|
||||
segment.getIdentifier(),
|
||||
segment.getSize(),
|
||||
getPath(),
|
||||
path,
|
||||
available(),
|
||||
freeSpaceToKeep,
|
||||
currFreeSpace
|
||||
|
@ -118,4 +120,28 @@ class StorageLocation
|
|||
{
|
||||
return maxSize - currSize;
|
||||
}
|
||||
|
||||
private long getCurrFreeSpace()
|
||||
{
|
||||
long currFreeSpace = 0;
|
||||
try {
|
||||
currFreeSpace = Files.getFileStore(path).getUsableSpace();
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Unable to get free space of SegmentLocation[%s].", path);
|
||||
}
|
||||
return currFreeSpace;
|
||||
}
|
||||
|
||||
private long getTotalSpaceInPartition()
|
||||
{
|
||||
long totalSpaceInPartition = 0;
|
||||
try {
|
||||
totalSpaceInPartition = Files.getFileStore(path).getTotalSpace();
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Unable to get total size of SegmentLocation[%s].", path);
|
||||
}
|
||||
return totalSpaceInPartition;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,13 +20,18 @@
|
|||
package io.druid.segment.loading;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.jimfs.Configuration;
|
||||
import com.google.common.jimfs.Jimfs;
|
||||
import com.google.common.jimfs.PathType;
|
||||
import io.druid.java.util.common.Intervals;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
|
@ -34,38 +39,47 @@ import java.util.Arrays;
|
|||
public class StorageLocationTest
|
||||
{
|
||||
@Test
|
||||
public void testStorageLocationFreePercent()
|
||||
public void testStorageLocationFreePercent() throws IOException
|
||||
{
|
||||
// free space ignored only maxSize matters
|
||||
StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null);
|
||||
Assert.assertTrue(locationPlain.canHandle(makeSegment("2012/2013", 9_000)));
|
||||
Assert.assertFalse(locationPlain.canHandle(makeSegment("2012/2013", 11_000)));
|
||||
locationPlain.getPath().getFileSystem().close();
|
||||
|
||||
// enough space available maxSize is the limit
|
||||
StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0);
|
||||
Assert.assertTrue(locationFree.canHandle(makeSegment("2012/2013", 9_000)));
|
||||
Assert.assertFalse(locationFree.canHandle(makeSegment("2012/2013", 11_000)));
|
||||
locationFree.getPath().getFileSystem().close();
|
||||
|
||||
// disk almost full percentage is the limit
|
||||
StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0);
|
||||
Assert.assertTrue(locationFull.canHandle(makeSegment("2012/2013", 4_000)));
|
||||
Assert.assertFalse(locationFull.canHandle(makeSegment("2012/2013", 6_000)));
|
||||
locationFull.getPath().getFileSystem().close();
|
||||
}
|
||||
|
||||
private StorageLocation fakeLocation(long total, long free, long max, Double percent)
|
||||
private StorageLocation fakeLocation(long total, long free, long max, Double percent) throws IOException
|
||||
{
|
||||
File file = EasyMock.mock(File.class);
|
||||
EasyMock.expect(file.getTotalSpace()).andReturn(total).anyTimes();
|
||||
EasyMock.expect(file.getFreeSpace()).andReturn(free).anyTimes();
|
||||
EasyMock.replay(file);
|
||||
return new StorageLocation(file, max, percent);
|
||||
Configuration config = Configuration.builder(PathType.unix())
|
||||
.setRoots("/")
|
||||
.setWorkingDirectory("/")
|
||||
.setAttributeViews("basic")
|
||||
.setMaxSize(total)
|
||||
.setBlockSize(1000)
|
||||
.build();
|
||||
Path path = Jimfs.newFileSystem(config).getPath("/");
|
||||
Files.write(path.resolve("junk"), new byte[(int) (total - free)]); //TODO is there a better way?
|
||||
Assert.assertEquals(free, Files.getFileStore(path).getUsableSpace());
|
||||
return new StorageLocation(path, max, percent);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStorageLocation()
|
||||
{
|
||||
long expectedAvail = 1000L;
|
||||
StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null);
|
||||
StorageLocation loc = new StorageLocation(new File("/tmp").toPath(), expectedAvail, null);
|
||||
|
||||
verifyLoc(expectedAvail, loc);
|
||||
|
||||
|
|
Loading…
Reference in New Issue