mirror of https://github.com/apache/druid.git
Fix bug with SegmentLoaderLocalCacheManager (#3929)
* Fix bug with SegmentLoaderLocalCacheManager * Use collections.sort
This commit is contained in:
parent
2ead572639
commit
02fc625b5f
|
@ -20,7 +20,7 @@
|
|||
package io.druid.segment.loading;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
|
@ -33,8 +33,10 @@ import org.apache.commons.io.FileUtils;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.SortedSet;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -46,10 +48,18 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
private final SegmentLoaderConfig config;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
private final SortedSet<StorageLocation> locations;
|
||||
private final List<StorageLocation> locations;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private static final Comparator<StorageLocation> COMPARATOR = new Comparator<StorageLocation>()
|
||||
{
|
||||
@Override public int compare(StorageLocation left, StorageLocation right)
|
||||
{
|
||||
return Longs.compare(right.available(), left.available());
|
||||
}
|
||||
};
|
||||
|
||||
@Inject
|
||||
public SegmentLoaderLocalCacheManager(
|
||||
IndexIO indexIO,
|
||||
|
@ -61,15 +71,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
this.config = config;
|
||||
this.jsonMapper = mapper;
|
||||
|
||||
this.locations = Sets.newTreeSet(new Comparator<StorageLocation>()
|
||||
{
|
||||
@Override
|
||||
public int compare(StorageLocation left, StorageLocation right)
|
||||
{
|
||||
// sorted from empty to full
|
||||
return Longs.compare(right.available(), left.available());
|
||||
}
|
||||
});
|
||||
this.locations = Lists.newArrayList();
|
||||
for (StorageLocationConfig locationConfig : config.getLocations()) {
|
||||
locations.add(new StorageLocation(locationConfig.getPath(), locationConfig.getMaxSize()));
|
||||
}
|
||||
|
@ -88,7 +90,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
|
||||
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
|
||||
{
|
||||
for (StorageLocation location : locations) {
|
||||
for (StorageLocation location : getSortedList(locations)) {
|
||||
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
if (localStorageDir.exists()) {
|
||||
return location;
|
||||
|
@ -138,7 +140,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
*/
|
||||
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
|
||||
{
|
||||
for (StorageLocation loc : locations) {
|
||||
for (StorageLocation loc : getSortedList(locations)) {
|
||||
// locIter is ordered from empty to full
|
||||
if (!loc.canHandle(segment.getSize())) {
|
||||
throw new ISE(
|
||||
|
@ -230,7 +232,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
// If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
|
||||
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
|
||||
// So we should always clean all possible locations here
|
||||
for (StorageLocation location : locations) {
|
||||
for (StorageLocation location : getSortedList(locations)) {
|
||||
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
if (localStorageDir.exists()) {
|
||||
// Druid creates folders of the form dataSource/interval/version/partitionNum.
|
||||
|
@ -270,4 +272,12 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<StorageLocation> getSortedList(List<StorageLocation> locs)
|
||||
{
|
||||
List<StorageLocation> locations = new ArrayList<>(locs);
|
||||
Collections.sort(locations, COMPARATOR);
|
||||
|
||||
return locations;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -294,6 +294,83 @@ public class SegmentLoaderLocalCacheManagerTest
|
|||
manager.cleanup(segmentToDownload);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyToFullOrder() throws Exception
|
||||
{
|
||||
final List<StorageLocationConfig> locations = Lists.newArrayList();
|
||||
final StorageLocationConfig locationConfig = new StorageLocationConfig();
|
||||
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
|
||||
localStorageFolder.setWritable(true);
|
||||
locationConfig.setPath(localStorageFolder);
|
||||
locationConfig.setMaxSize(10L);
|
||||
locations.add(locationConfig);
|
||||
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
|
||||
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
|
||||
localStorageFolder2.setWritable(true);
|
||||
locationConfig2.setPath(localStorageFolder2);
|
||||
locationConfig2.setMaxSize(10L);
|
||||
locations.add(locationConfig2);
|
||||
|
||||
manager = new SegmentLoaderLocalCacheManager(
|
||||
TestHelper.getTestIndexIO(),
|
||||
new SegmentLoaderConfig().withLocations(locations),
|
||||
jsonMapper
|
||||
);
|
||||
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
|
||||
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"type",
|
||||
"local",
|
||||
"path",
|
||||
segmentSrcFolder.getCanonicalPath()
|
||||
+ "/test_segment_loader"
|
||||
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
|
||||
+ "/0/index.zip"
|
||||
)
|
||||
);
|
||||
// manually create a local segment under segmentSrcFolder
|
||||
final File localSegmentFile = new File(
|
||||
segmentSrcFolder,
|
||||
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
|
||||
);
|
||||
localSegmentFile.mkdirs();
|
||||
final File indexZip = new File(localSegmentFile, "index.zip");
|
||||
indexZip.createNewFile();
|
||||
|
||||
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
|
||||
|
||||
File segmentFile = manager.getSegmentFiles(segmentToDownload);
|
||||
Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
|
||||
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
|
||||
|
||||
final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"type",
|
||||
"local",
|
||||
"path",
|
||||
segmentSrcFolder.getCanonicalPath()
|
||||
+ "/test_segment_loader"
|
||||
+ "/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
|
||||
+ "/0/index.zip"
|
||||
)
|
||||
);
|
||||
// manually create a local segment under segmentSrcFolder
|
||||
final File localSegmentFile2 = new File(
|
||||
segmentSrcFolder,
|
||||
"test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
|
||||
);
|
||||
localSegmentFile2.mkdirs();
|
||||
final File indexZip2 = new File(localSegmentFile2, "index.zip");
|
||||
indexZip2.createNewFile();
|
||||
|
||||
File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
|
||||
Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
|
||||
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));
|
||||
|
||||
manager.cleanup(segmentToDownload2);
|
||||
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2));
|
||||
}
|
||||
|
||||
private DataSegment dataSegmentWithInterval(String intervalStr)
|
||||
{
|
||||
return DataSegment.builder()
|
||||
|
@ -312,7 +389,7 @@ public class SegmentLoaderLocalCacheManagerTest
|
|||
.metrics(ImmutableList.<String>of())
|
||||
.shardSpec(NoneShardSpec.instance())
|
||||
.binaryVersion(9)
|
||||
.size(0)
|
||||
.size(10L)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue