YARN-5277. When localizers fail due to resource timestamps being out, provide more diagnostics. Contributed by Siddharth Ahuja
This commit is contained in:
parent
f473473355
commit
4bd37f2283
|
@ -32,6 +32,7 @@ import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
|
@ -269,9 +270,12 @@ public class FSDownload implements Callable<Path> {
|
||||||
FileSystem sourceFs = sCopy.getFileSystem(conf);
|
FileSystem sourceFs = sCopy.getFileSystem(conf);
|
||||||
FileStatus sStat = sourceFs.getFileStatus(sCopy);
|
FileStatus sStat = sourceFs.getFileStatus(sCopy);
|
||||||
if (sStat.getModificationTime() != resource.getTimestamp()) {
|
if (sStat.getModificationTime() != resource.getTimestamp()) {
|
||||||
throw new IOException("Resource " + sCopy +
|
throw new IOException("Resource " + sCopy + " changed on src filesystem" +
|
||||||
" changed on src filesystem (expected " + resource.getTimestamp() +
|
" - expected: " +
|
||||||
", was " + sStat.getModificationTime());
|
"\"" + Times.formatISO8601(resource.getTimestamp()) + "\"" +
|
||||||
|
", was: " +
|
||||||
|
"\"" + Times.formatISO8601(sStat.getModificationTime()) + "\"" +
|
||||||
|
", current time: " + "\"" + Times.formatISO8601(Time.now()) + "\"");
|
||||||
}
|
}
|
||||||
if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
|
if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
|
||||||
if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
|
if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
|
||||||
|
|
|
@ -711,4 +711,78 @@ public class TestFSDownload {
|
||||||
// destination directory (passed as an argument) + file name.
|
// destination directory (passed as an argument) + file name.
|
||||||
Assert.assertEquals(destPath, rPath.get().getParent());
|
Assert.assertEquals(destPath, rPath.get().getParent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test method is responsible for creating an IOException resulting
|
||||||
|
* from modification to the local resource's timestamp on the source FS just
|
||||||
|
* before the download of this local resource has started.
|
||||||
|
*/
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testResourceTimestampChangeDuringDownload()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
conf = new Configuration();
|
||||||
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||||
|
final Path basedir = files.makeQualified(
|
||||||
|
new Path("target", TestFSDownload.class.getSimpleName()));
|
||||||
|
files.mkdir(basedir, null, true);
|
||||||
|
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
|
||||||
|
|
||||||
|
LocalDirAllocator dirs =
|
||||||
|
new LocalDirAllocator(TestFSDownload.class.getName());
|
||||||
|
|
||||||
|
Path path = new Path(basedir, "test-file");
|
||||||
|
Random rand = new Random();
|
||||||
|
long sharedSeed = rand.nextLong();
|
||||||
|
rand.setSeed(sharedSeed);
|
||||||
|
int size = 512;
|
||||||
|
LocalResourceVisibility vis = LocalResourceVisibility.PUBLIC;
|
||||||
|
LocalResource localResource = createFile(files, path, size, rand, vis);
|
||||||
|
|
||||||
|
Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
||||||
|
destPath = new Path(destPath,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||||
|
|
||||||
|
FSDownload fsDownload = new FSDownload(files,
|
||||||
|
UserGroupInformation.getCurrentUser(), conf, destPath, localResource);
|
||||||
|
|
||||||
|
// Store the original local resource timestamp used to set up the
|
||||||
|
// FSDownload object just before (but before the download starts)
|
||||||
|
// for comparison purposes later on.
|
||||||
|
long origLRTimestamp = localResource.getTimestamp();
|
||||||
|
|
||||||
|
// Modify the local resource's timestamp to yesterday on the Filesystem
|
||||||
|
// just before FSDownload starts.
|
||||||
|
final long msInADay = 86400 * 1000;
|
||||||
|
long modifiedFSTimestamp = origLRTimestamp - msInADay;
|
||||||
|
try {
|
||||||
|
Path sourceFsPath = localResource.getResource().toPath();
|
||||||
|
FileSystem sourceFs = sourceFsPath.getFileSystem(conf);
|
||||||
|
sourceFs.setTimes(sourceFsPath, modifiedFSTimestamp, modifiedFSTimestamp);
|
||||||
|
} catch (URISyntaxException use) {
|
||||||
|
Assert.fail("No exception expected.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the FSDownload operation.
|
||||||
|
Map<LocalResource, Future<Path>> pending = new HashMap<>();
|
||||||
|
ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
|
||||||
|
pending.put(localResource, exec.submit(fsDownload));
|
||||||
|
|
||||||
|
exec.shutdown();
|
||||||
|
|
||||||
|
exec.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||||
|
Assert.assertTrue(pending.get(localResource).isDone());
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (Map.Entry<LocalResource, Future<Path>> p : pending.entrySet()) {
|
||||||
|
p.getValue().get();
|
||||||
|
}
|
||||||
|
Assert.fail("Exception expected from timestamp update during download");
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
Assert.assertTrue(ee.getCause() instanceof IOException);
|
||||||
|
Assert.assertTrue("Exception contains original timestamp",
|
||||||
|
ee.getMessage().contains(Times.formatISO8601(origLRTimestamp)));
|
||||||
|
Assert.assertTrue("Exception contains modified timestamp",
|
||||||
|
ee.getMessage().contains(Times.formatISO8601(modifiedFSTimestamp)));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue