MAPREDUCE-6862. Fragments are not handled correctly by resource limit checking. (Chris Trezzo via mingma)

(cherry picked from commit ceab00ac62)
This commit is contained in:
Ming Ma 2017-03-29 17:41:58 -07:00
parent 3fe7d36e72
commit 2ae9ae1864
2 changed files with 59 additions and 17 deletions

View File

@ -238,28 +238,42 @@ void checkLocalizationLimits(Configuration conf, Collection<String> files,
Collection<String> dcArchives =
conf.getStringCollection(MRJobConfig.CACHE_ARCHIVES);
for (String path : dcFiles) {
explorePath(conf, new Path(path), limitChecker, statCache);
for (String uri : dcFiles) {
explorePath(conf, stringToPath(uri), limitChecker, statCache);
}
for (String path : dcArchives) {
explorePath(conf, new Path(path), limitChecker, statCache);
for (String uri : dcArchives) {
explorePath(conf, stringToPath(uri), limitChecker, statCache);
}
for (String path : files) {
explorePath(conf, new Path(path), limitChecker, statCache);
for (String uri : files) {
explorePath(conf, stringToPath(uri), limitChecker, statCache);
}
for (String path : libjars) {
explorePath(conf, new Path(path), limitChecker, statCache);
for (String uri : libjars) {
explorePath(conf, stringToPath(uri), limitChecker, statCache);
}
for (String path : archives) {
explorePath(conf, new Path(path), limitChecker, statCache);
for (String uri : archives) {
explorePath(conf, stringToPath(uri), limitChecker, statCache);
}
if (jobJar != null) {
explorePath(conf, new Path(jobJar), limitChecker, statCache);
explorePath(conf, stringToPath(jobJar), limitChecker, statCache);
}
}
/**
* Convert a String to a Path and gracefully remove fragments/queries if they
* exist in the String.
*/
@VisibleForTesting
Path stringToPath(String s) {
try {
URI uri = new URI(s);
return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}

View File

@ -39,6 +39,34 @@
*/
public class TestJobResourceUploader {
@Test
public void testStringToPath() throws IOException {
Configuration conf = new Configuration();
JobResourceUploader uploader =
new JobResourceUploader(FileSystem.getLocal(conf), false);
Assert.assertEquals("Failed: absolute, no scheme, with fragment",
"/testWithFragment.txt",
uploader.stringToPath("/testWithFragment.txt#fragment.txt").toString());
Assert.assertEquals("Failed: absolute, with scheme, with fragment",
"file:/testWithFragment.txt",
uploader.stringToPath("file:///testWithFragment.txt#fragment.txt")
.toString());
Assert.assertEquals("Failed: relative, no scheme, with fragment",
"testWithFragment.txt",
uploader.stringToPath("testWithFragment.txt#fragment.txt").toString());
Assert.assertEquals("Failed: relative, no scheme, no fragment",
"testWithFragment.txt",
uploader.stringToPath("testWithFragment.txt").toString());
Assert.assertEquals("Failed: absolute, with scheme, no fragment",
"file:/testWithFragment.txt",
uploader.stringToPath("file:///testWithFragment.txt").toString());
}
@Test
public void testAllDefaults() throws IOException {
ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
@ -210,17 +238,17 @@ private JobConf setupJobConf(ResourceLimitsConf rlConf) {
rlConf.maxSingleResourceMB);
conf.set("tmpfiles",
buildPathString("file://tmpFiles", rlConf.numOfTmpFiles));
buildPathString("file:///tmpFiles", rlConf.numOfTmpFiles));
conf.set("tmpjars",
buildPathString("file://tmpjars", rlConf.numOfTmpLibJars));
buildPathString("file:///tmpjars", rlConf.numOfTmpLibJars));
conf.set("tmparchives",
buildPathString("file://tmpArchives", rlConf.numOfTmpArchives));
buildPathString("file:///tmpArchives", rlConf.numOfTmpArchives));
conf.set(MRJobConfig.CACHE_ARCHIVES,
buildPathString("file://cacheArchives", rlConf.numOfDCArchives));
buildPathString("file:///cacheArchives", rlConf.numOfDCArchives));
conf.set(MRJobConfig.CACHE_FILES,
buildPathString("file://cacheFiles", rlConf.numOfDCFiles));
buildPathString("file:///cacheFiles", rlConf.numOfDCFiles));
if (rlConf.jobJar) {
conf.setJar("file://jobjar.jar");
conf.setJar("file:///jobjar.jar");
}
return conf;
}