diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index cb3cbb5309f..4995c323027 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -20,7 +20,7 @@ package io.druid.segment.loading; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; +import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import com.google.inject.Inject; import com.metamx.common.ISE; @@ -33,8 +33,10 @@ import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; -import java.util.SortedSet; +import java.util.List; /** */ @@ -46,10 +48,18 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader private final SegmentLoaderConfig config; private final ObjectMapper jsonMapper; - private final SortedSet locations; + private final List locations; private final Object lock = new Object(); + private static final Comparator COMPARATOR = new Comparator() + { + @Override public int compare(StorageLocation left, StorageLocation right) + { + return Longs.compare(right.available(), left.available()); + } + }; + @Inject public SegmentLoaderLocalCacheManager( IndexIO indexIO, @@ -61,15 +71,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader this.config = config; this.jsonMapper = mapper; - this.locations = Sets.newTreeSet(new Comparator() - { - @Override - public int compare(StorageLocation left, StorageLocation right) - { - // sorted from empty to full - return Longs.compare(right.available(), left.available()); - } - }); + this.locations = Lists.newArrayList(); for (StorageLocationConfig locationConfig : config.getLocations()) { locations.add(new StorageLocation(locationConfig.getPath(), locationConfig.getMaxSize())); } @@ -88,7 +90,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) { - for (StorageLocation location : locations) { + for (StorageLocation location : getSortedList(locations)) { File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); if (localStorageDir.exists()) { return location; @@ -138,7 +140,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader */ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { - for (StorageLocation loc : locations) { + for (StorageLocation loc : getSortedList(locations)) { // locIter is ordered from empty to full if (!loc.canHandle(segment.getSize())) { throw new ISE( @@ -230,7 +232,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed, // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. // So we should always clean all possible locations here - for (StorageLocation location : locations) { + for (StorageLocation location : getSortedList(locations)) { File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); if (localStorageDir.exists()) { // Druid creates folders of the form dataSource/interval/version/partitionNum. @@ -270,4 +272,12 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader } } } + + public List getSortedList(List locs) + { + List locations = new ArrayList<>(locs); + Collections.sort(locations, COMPARATOR); + + return locations; + } } diff --git a/server/src/test/java/io/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/io/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 85bdc4ca837..cdb72409fa1 100644 --- a/server/src/test/java/io/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/io/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -294,6 +294,83 @@ public class SegmentLoaderLocalCacheManagerTest manager.cleanup(segmentToDownload); } + @Test + public void testEmptyToFullOrder() throws Exception + { + final List locations = Lists.newArrayList(); + final StorageLocationConfig locationConfig = new StorageLocationConfig(); + final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); + localStorageFolder.setWritable(true); + locationConfig.setPath(localStorageFolder); + locationConfig.setMaxSize(10L); + locations.add(locationConfig); + final StorageLocationConfig locationConfig2 = new StorageLocationConfig(); + final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2"); + localStorageFolder2.setWritable(true); + locationConfig2.setPath(localStorageFolder2); + locationConfig2.setMaxSize(10L); + locations.add(locationConfig2); + + manager = new SegmentLoaderLocalCacheManager( + TestHelper.getTestIndexIO(), + new SegmentLoaderConfig().withLocations(locations), + jsonMapper + ); + final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder"); + final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + final File localSegmentFile = new File( + segmentSrcFolder, + "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0" + ); + localSegmentFile.mkdirs(); + final File indexZip = new File(localSegmentFile, "index.zip"); + indexZip.createNewFile(); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + File segmentFile = manager.getSegmentFiles(segmentToDownload); + Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload)); + + final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + // manually create a local segment under segmentSrcFolder + final File localSegmentFile2 = new File( + segmentSrcFolder, + "test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0" + ); + localSegmentFile2.mkdirs(); + final File indexZip2 = new File(localSegmentFile2, "index.zip"); + indexZip2.createNewFile(); + + File segmentFile2 = manager.getSegmentFiles(segmentToDownload2); + Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2)); + + manager.cleanup(segmentToDownload2); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2)); + } + private DataSegment dataSegmentWithInterval(String intervalStr) { return DataSegment.builder() @@ -312,7 +389,7 @@ public class SegmentLoaderLocalCacheManagerTest .metrics(ImmutableList.of()) .shardSpec(NoneShardSpec.instance()) .binaryVersion(9) - .size(0) + .size(10L) .build(); } }