retry loadSegment with all locations (#3681)

This commit is contained in:
kaijianding 2016-12-07 04:00:59 +08:00 committed by Slim
parent 5440a06b2d
commit f995b1426f
3 changed files with 266 additions and 58 deletions

View File

@ -20,12 +20,12 @@
package io.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -34,20 +34,20 @@ import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Comparator;
import java.util.SortedSet;
/**
*/
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final Logger log = new Logger(SegmentLoaderLocalCacheManager.class);
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
private final QueryableIndexFactory factory;
private final SegmentLoaderConfig config;
private final ObjectMapper jsonMapper;
private final List<StorageLocation> locations;
private final SortedSet<StorageLocation> locations;
private final Object lock = new Object();
@ -62,7 +62,15 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
this.config = config;
this.jsonMapper = mapper;
this.locations = Lists.newArrayList();
this.locations = Sets.newTreeSet(new Comparator<StorageLocation>()
{
@Override
public int compare(StorageLocation left, StorageLocation right)
{
// sorted from empty to full
return Longs.compare(right.available(), left.available());
}
});
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(new StorageLocation(locationConfig.getPath(), locationConfig.getMaxSize()));
}
@ -103,61 +111,90 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
StorageLocation loc = findStorageLocationIfLoaded(segment);
final File retVal;
String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
if (loc == null) {
Iterator<StorageLocation> locIter = locations.iterator();
loc = locIter.next();
while (locIter.hasNext()) {
loc = loc.mostEmpty(locIter.next());
}
loc = loadSegmentWithRetry(segment, storageDir);
}
loc.addSegment(segment);
return new File(loc.getPath(), storageDir);
}
/**
* location may fail because of IO failure, most likely in two cases:<p>
* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p>
* 2. disk failure, druid can't read/write to this disk anymore
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
for (StorageLocation loc : locations) {
// locIter is ordered from empty to full
if (!loc.canHandle(segment.getSize())) {
throw new ISE(
"Segment[%s:%,d] too large for storage[%s:%,d].",
segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
);
}
File storageDir = new File(loc.getPath(), storageDirStr);
File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
try {
loadInLocationWithStartMarker(segment, storageDir);
return loc;
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment in current location %s, try next location if any", loc.getPath().getAbsolutePath())
.addData("location", loc.getPath().getAbsolutePath())
.emit();
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
synchronized (lock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
try {
if (!downloadStartMarker.createNewFile()) {
throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
}
cleanupCacheFiles(loc.getPath(), storageDir);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
catch (IOException e1) {
log.error(e1, "Failed to cleanup location " + storageDir.getAbsolutePath());
}
}
// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies.
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
if(result.getSize() != segment.getSize()){
log.warn("Segment [%s] is different than expected size. Expected [%d] found [%d]", segment.getIdentifier(), segment.getSize(), result.getSize());
}
if (!downloadStartMarker.delete()) {
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
}
retVal = storageDir;
} else {
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
}
throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getIdentifier());
}
loc.addSegment(segment);
private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
synchronized (lock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
try {
if (!downloadStartMarker.createNewFile()) {
throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
}
}
loadInLocation(segment, storageDir);
return retVal;
if (!downloadStartMarker.delete()) {
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
}
}
private void loadInLocation(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies.
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
if (result.getSize() != segment.getSize()) {
log.warn(
"Segment [%s] is different than expected size. Expected [%d] found [%d]",
segment.getIdentifier(),
segment.getSize(),
result.getSize()
);
}
}
@Override
@ -175,11 +212,19 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
try {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
cleanupCacheFiles(loc.getPath(), cacheFile);
loc.removeSegment(segment);
// If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
File cacheFile = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
cleanupCacheFiles(location.getPath(), cacheFile);
location.removeSegment(segment);
}
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
@ -202,8 +247,12 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
}
if (cacheFile.getParentFile() != null && cacheFile.getParentFile().listFiles().length == 0) {
cleanupCacheFiles(baseFile, cacheFile.getParentFile());
File parent = cacheFile.getParentFile();
if (parent != null) {
File[] children = parent.listFiles();
if (children == null || children.length == 0) {
cleanupCacheFiles(baseFile, parent);
}
}
}
}

View File

@ -76,9 +76,4 @@ class StorageLocation
{
return maxSize - currSize;
}
StorageLocation mostEmpty(StorageLocation other)
{
return available() > other.available() ? this : other;
}
}

View File

@ -25,8 +25,10 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.emitter.EmittingLogger;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.TestHelper;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
@ -64,6 +66,7 @@ public class SegmentLoaderLocalCacheManagerTest
@Before
public void setUp() throws Exception
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder");
final List<StorageLocationConfig> locations = Lists.newArrayList();
@ -130,6 +133,167 @@ public class SegmentLoaderLocalCacheManagerTest
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
}
@Test
public void testRetrySuccessAtFirstLocation() throws Exception
{
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
final List<StorageLocationConfig> locations = Lists.newArrayList();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
locationConfig.setPath(localStorageFolder);
locationConfig.setMaxSize(10000000000L);
locations.add(locationConfig);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
locationConfig2.setPath(localStorageFolder2);
locationConfig2.setMaxSize(1000000000L);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
final File localSegmentFile = new File(
segmentSrcFolder,
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
);
localSegmentFile.mkdirs();
final File indexZip = new File(localSegmentFile, "index.zip");
indexZip.createNewFile();
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
File segmentFile = manager.getSegmentFiles(segmentToDownload);
Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
manager.cleanup(segmentToDownload);
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
}
@Test
public void testRetrySuccessAtSecondLocation() throws Exception
{
final List<StorageLocationConfig> locations = Lists.newArrayList();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
// mock can't write in first location
localStorageFolder.setWritable(false);
locationConfig.setPath(localStorageFolder);
locationConfig.setMaxSize(1000000000L);
locations.add(locationConfig);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
locationConfig2.setPath(localStorageFolder2);
locationConfig2.setMaxSize(10000000L);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
final File localSegmentFile = new File(
segmentSrcFolder,
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
);
localSegmentFile.mkdirs();
final File indexZip = new File(localSegmentFile, "index.zip");
indexZip.createNewFile();
Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));
File segmentFile = manager.getSegmentFiles(segmentToDownload);
Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder2/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));
manager.cleanup(segmentToDownload);
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
}
@Test
public void testRetryAllFail() throws Exception
{
final List<StorageLocationConfig> locations = Lists.newArrayList();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
// mock can't write in first location
localStorageFolder.setWritable(false);
locationConfig.setPath(localStorageFolder);
locationConfig.setMaxSize(1000000000L);
locations.add(locationConfig);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
// mock can't write in second location
localStorageFolder2.setWritable(false);
locationConfig2.setPath(localStorageFolder2);
locationConfig2.setMaxSize(10000000L);
locations.add(locationConfig2);
manager = new SegmentLoaderLocalCacheManager(
new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
final File localSegmentFile = new File(
segmentSrcFolder,
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
);
localSegmentFile.mkdirs();
final File indexZip = new File(localSegmentFile, "index.zip");
indexZip.createNewFile();
try {
// expect failure
manager.getSegmentFiles(segmentToDownload);
Assert.fail();
}
catch (SegmentLoadingException e) {
}
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload));
manager.cleanup(segmentToDownload);
}
private DataSegment dataSegmentWithInterval(String intervalStr)
{
return DataSegment.builder()