diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index e7369a99039..0de1badcf29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -32,6 +32,7 @@ import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -269,9 +270,12 @@ public class FSDownload implements Callable { FileSystem sourceFs = sCopy.getFileSystem(conf); FileStatus sStat = sourceFs.getFileStatus(sCopy); if (sStat.getModificationTime() != resource.getTimestamp()) { - throw new IOException("Resource " + sCopy + - " changed on src filesystem (expected " + resource.getTimestamp() + - ", was " + sStat.getModificationTime()); + throw new IOException("Resource " + sCopy + " changed on src filesystem" + + " - expected: " + + "\"" + Times.formatISO8601(resource.getTimestamp()) + "\"" + + ", was: " + + "\"" + Times.formatISO8601(sStat.getModificationTime()) + "\"" + + ", current time: " + "\"" + Times.formatISO8601(Time.now()) + "\""); } if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { if (!isPublic(sourceFs, sCopy, sStat, statCache)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 678687fa582..c0b4b9216bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -711,4 +711,78 @@ public class TestFSDownload { // destination directory (passed as an argument) + file name. Assert.assertEquals(destPath, rPath.get().getParent()); } + + /** + * This test method is responsible for creating an IOException resulting + * from modification to the local resource's timestamp on the source FS just + * before the download of this local resource has started. + */ + @Test(timeout=10000) + public void testResourceTimestampChangeDuringDownload() + throws IOException, InterruptedException { + conf = new Configuration(); + FileContext files = FileContext.getLocalFSFileContext(conf); + final Path basedir = files.makeQualified( + new Path("target", TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + LocalDirAllocator dirs = + new LocalDirAllocator(TestFSDownload.class.getName()); + + Path path = new Path(basedir, "test-file"); + Random rand = new Random(); + long sharedSeed = rand.nextLong(); + rand.setSeed(sharedSeed); + int size = 512; + LocalResourceVisibility vis = LocalResourceVisibility.PUBLIC; + LocalResource localResource = createFile(files, path, size, rand, vis); + + Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + destPath = new Path(destPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); + + FSDownload fsDownload = new FSDownload(files, + UserGroupInformation.getCurrentUser(), conf, destPath, localResource); + + // Store the original local resource timestamp used to set up the + // FSDownload object just before (but before the download starts) + // for comparison purposes later on. + long origLRTimestamp = localResource.getTimestamp(); + + // Modify the local resource's timestamp to yesterday on the Filesystem + // just before FSDownload starts. + final long msInADay = 86400 * 1000; + long modifiedFSTimestamp = origLRTimestamp - msInADay; + try { + Path sourceFsPath = localResource.getResource().toPath(); + FileSystem sourceFs = sourceFsPath.getFileSystem(conf); + sourceFs.setTimes(sourceFsPath, modifiedFSTimestamp, modifiedFSTimestamp); + } catch (URISyntaxException use) { + Assert.fail("No exception expected."); + } + + // Execute the FSDownload operation. + Map> pending = new HashMap<>(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); + pending.put(localResource, exec.submit(fsDownload)); + + exec.shutdown(); + + exec.awaitTermination(1000, TimeUnit.MILLISECONDS); + Assert.assertTrue(pending.get(localResource).isDone()); + + try { + for (Map.Entry> p : pending.entrySet()) { + p.getValue().get(); + } + Assert.fail("Exception expected from timestamp update during download"); + } catch (ExecutionException ee) { + Assert.assertTrue(ee.getCause() instanceof IOException); + Assert.assertTrue("Exception contains original timestamp", + ee.getMessage().contains(Times.formatISO8601(origLRTimestamp))); + Assert.assertTrue("Exception contains modified timestamp", + ee.getMessage().contains(Times.formatISO8601(modifiedFSTimestamp))); + } + } }