From 848ae35a9325467cd8299457685dcb23815f69f5 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 17 Aug 2016 16:21:20 +0000 Subject: [PATCH] MAPREDUCE-6690. Limit the number of resources a single map reduce job can submit for localization. Contributed by Chris Trezzo (cherry picked from commit f80a7298325a4626638ee24467e2012442e480d4) --- .../hadoop/mapreduce/JobResourceUploader.java | 214 +++++++++-- .../apache/hadoop/mapreduce/MRJobConfig.java | 28 ++ .../ClientDistributedCacheManager.java | 15 +- .../src/main/resources/mapred-default.xml | 30 ++ .../mapreduce/TestJobResourceUploader.java | 355 ++++++++++++++++++ .../hadoop/mapreduce/v2/TestMRJobs.java | 166 +++++++- 6 files changed, 776 insertions(+), 32 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index fa4dd86f3d4..15dbc13fb9c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -21,12 +21,16 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -34,6 +38,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import com.google.common.annotations.VisibleForTesting; + @InterfaceAudience.Private @InterfaceStability.Unstable class JobResourceUploader { @@ -86,31 +92,37 @@ class JobResourceUploader { FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms); - // add all the command line files/ jars and archive - // first copy them to jobtrackers filesystem - uploadFiles(conf, submitJobDir, mapredSysPerms, replication); - uploadLibJars(conf, submitJobDir, mapredSysPerms, replication); - uploadArchives(conf, submitJobDir, mapredSysPerms, replication); - uploadJobJar(job, submitJobDir, replication); + Collection files = conf.getStringCollection("tmpfiles"); + Collection libjars = conf.getStringCollection("tmpjars"); + Collection archives = conf.getStringCollection("tmparchives"); + String jobJar = job.getJar(); + + Map statCache = new HashMap(); + checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache); + + uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication); + uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication); + uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication); + uploadJobJar(job, jobJar, submitJobDir, replication); addLog4jToDistributedCache(job, submitJobDir); // set the timestamps of the archives and files // set the public/private visibility of the archives and files - ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf); + ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf, + statCache); // get DelegationToken for cached file ClientDistributedCacheManager.getDelegationTokens(conf, job.getCredentials()); } - private void uploadFiles(Configuration conf, Path submitJobDir, - FsPermission mapredSysPerms, short submitReplication) throws IOException { - String files = conf.get("tmpfiles"); + private void uploadFiles(Configuration conf, Collection files, + Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) + throws IOException { Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); - if (files != null) { + if (!files.isEmpty()) { FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms); - String[] fileArr = files.split(","); - for (String tmpFile : fileArr) { + for (String tmpFile : files) { URI tmpURI = null; try { tmpURI = new URI(tmpFile); @@ -130,14 +142,13 @@ class JobResourceUploader { } } - private void uploadLibJars(Configuration conf, Path submitJobDir, - FsPermission mapredSysPerms, short submitReplication) throws IOException { - String libjars = conf.get("tmpjars"); + private void uploadLibJars(Configuration conf, Collection libjars, + Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) + throws IOException { Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); - if (libjars != null) { + if (!libjars.isEmpty()) { FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms); - String[] libjarsArr = libjars.split(","); - for (String tmpjars : libjarsArr) { + for (String tmpjars : libjars) { Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, submitReplication); @@ -157,14 +168,13 @@ class JobResourceUploader { } } - private void uploadArchives(Configuration conf, Path submitJobDir, - FsPermission mapredSysPerms, short submitReplication) throws IOException { - String archives = conf.get("tmparchives"); + private void uploadArchives(Configuration conf, Collection archives, + Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) + throws IOException { Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); - if (archives != null) { + if (!archives.isEmpty()) { FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms); - String[] archivesArr = archives.split(","); - for (String tmpArchives : archivesArr) { + for (String tmpArchives : archives) { URI tmpURI; try { tmpURI = new URI(tmpArchives); @@ -185,9 +195,8 @@ class JobResourceUploader { } } - private void uploadJobJar(Job job, Path submitJobDir, short submitReplication) - throws IOException { - String jobJar = job.getJar(); + private void uploadJobJar(Job job, String jobJar, Path submitJobDir, + short submitReplication) throws IOException { if (jobJar != null) { // copy jar to JobTracker's fs // use jar name if job is not named. if ("".equals(job.getJobName())) { @@ -208,6 +217,155 @@ class JobResourceUploader { } } + /** + * Verify that the resources this job is going to localize are within the + * localization limits. + */ + @VisibleForTesting + void checkLocalizationLimits(Configuration conf, Collection files, + Collection libjars, Collection archives, String jobJar, + Map statCache) throws IOException { + + LimitChecker limitChecker = new LimitChecker(conf); + if (!limitChecker.hasLimits()) { + // there are no limits set, so we are done. + return; + } + + // Get the files and archives that are already in the distributed cache + Collection dcFiles = + conf.getStringCollection(MRJobConfig.CACHE_FILES); + Collection dcArchives = + conf.getStringCollection(MRJobConfig.CACHE_ARCHIVES); + + for (String path : dcFiles) { + explorePath(conf, new Path(path), limitChecker, statCache); + } + + for (String path : dcArchives) { + explorePath(conf, new Path(path), limitChecker, statCache); + } + + for (String path : files) { + explorePath(conf, new Path(path), limitChecker, statCache); + } + + for (String path : libjars) { + explorePath(conf, new Path(path), limitChecker, statCache); + } + + for (String path : archives) { + explorePath(conf, new Path(path), limitChecker, statCache); + } + + if (jobJar != null) { + explorePath(conf, new Path(jobJar), limitChecker, statCache); + } + } + + @VisibleForTesting + protected static final String MAX_RESOURCE_ERR_MSG = + "This job has exceeded the maximum number of submitted resources"; + @VisibleForTesting + protected static final String MAX_TOTAL_RESOURCE_MB_ERR_MSG = + "This job has exceeded the maximum size of submitted resources"; + @VisibleForTesting + protected static final String MAX_SINGLE_RESOURCE_MB_ERR_MSG = + "This job has exceeded the maximum size of a single submitted resource"; + + private static class LimitChecker { + LimitChecker(Configuration conf) { + this.maxNumOfResources = + conf.getInt(MRJobConfig.MAX_RESOURCES, + MRJobConfig.MAX_RESOURCES_DEFAULT); + this.maxSizeMB = + conf.getLong(MRJobConfig.MAX_RESOURCES_MB, + MRJobConfig.MAX_RESOURCES_MB_DEFAULT); + this.maxSizeOfResourceMB = + conf.getLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB, + MRJobConfig.MAX_SINGLE_RESOURCE_MB_DEFAULT); + this.totalConfigSizeBytes = maxSizeMB * 1024 * 1024; + this.totalConfigSizeOfResourceBytes = maxSizeOfResourceMB * 1024 * 1024; + } + + private long totalSizeBytes = 0; + private int totalNumberOfResources = 0; + private long currentMaxSizeOfFileBytes = 0; + private final long maxSizeMB; + private final int maxNumOfResources; + private final long maxSizeOfResourceMB; + private final long totalConfigSizeBytes; + private final long totalConfigSizeOfResourceBytes; + + private boolean hasLimits() { + return maxNumOfResources > 0 || maxSizeMB > 0 || maxSizeOfResourceMB > 0; + } + + private void addFile(Path p, long fileSizeBytes) throws IOException { + totalNumberOfResources++; + totalSizeBytes += fileSizeBytes; + if (fileSizeBytes > currentMaxSizeOfFileBytes) { + currentMaxSizeOfFileBytes = fileSizeBytes; + } + + if (totalConfigSizeBytes > 0 && totalSizeBytes > totalConfigSizeBytes) { + throw new IOException(MAX_TOTAL_RESOURCE_MB_ERR_MSG + " (Max: " + + maxSizeMB + "MB)."); + } + + if (maxNumOfResources > 0 && + totalNumberOfResources > maxNumOfResources) { + throw new IOException(MAX_RESOURCE_ERR_MSG + " (Max: " + + maxNumOfResources + ")."); + } + + if (totalConfigSizeOfResourceBytes > 0 + && currentMaxSizeOfFileBytes > totalConfigSizeOfResourceBytes) { + throw new IOException(MAX_SINGLE_RESOURCE_MB_ERR_MSG + " (Max: " + + maxSizeOfResourceMB + "MB, Violating resource: " + p + ")."); + } + } + } + + /** + * Recursively explore the given path and enforce the limits for resource + * localization. This method assumes that there are no symlinks in the + * directory structure. + */ + private void explorePath(Configuration job, Path p, + LimitChecker limitChecker, Map statCache) + throws IOException { + Path pathWithScheme = p; + if (!pathWithScheme.toUri().isAbsolute()) { + // the path does not have a scheme, so we assume it is a path from the + // local filesystem + FileSystem localFs = FileSystem.getLocal(job); + pathWithScheme = localFs.makeQualified(p); + } + FileStatus status = getFileStatus(statCache, job, pathWithScheme); + if (status.isDirectory()) { + FileStatus[] statusArray = + pathWithScheme.getFileSystem(job).listStatus(pathWithScheme); + for (FileStatus s : statusArray) { + explorePath(job, s.getPath(), limitChecker, statCache); + } + } else { + limitChecker.addFile(pathWithScheme, status.getLen()); + } + } + + @VisibleForTesting + FileStatus getFileStatus(Map statCache, + Configuration job, Path p) throws IOException { + URI u = p.toUri(); + FileStatus status = statCache.get(u); + if (status == null) { + status = p.getFileSystem(job).getFileStatus(p); + statCache.put(u, status); + } + return status; + } + // copies a file to the jobtracker filesystem and returns the path where it // was copied to private Path copyRemoteFiles(Path parentDir, Path originalPath, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 706092e2792..9a88669fe80 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -949,6 +949,34 @@ public interface MRJobConfig { public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = 128; + /** + * The maximum number of resources a map reduce job is allowed to submit for + * localization via files, libjars, archives, and jobjar command line + * arguments and through the distributed cache. If set to 0 the limit is + * ignored. + */ + String MAX_RESOURCES = "mapreduce.job.cache.limit.max-resources"; + int MAX_RESOURCES_DEFAULT = 0; + + /** + * The maximum size (in MB) a map reduce job is allowed to submit for + * localization via files, libjars, archives, and jobjar command line + * arguments and through the distributed cache. If set to 0 the limit is + * ignored. + */ + String MAX_RESOURCES_MB = "mapreduce.job.cache.limit.max-resources-mb"; + long MAX_RESOURCES_MB_DEFAULT = 0; + + /** + * The maximum size (in MB) of a single resource a map reduce job is allow to + * submit for localization via files, libjars, archives, and jobjar command + * line arguments and through the distributed cache. If set to 0 the limit is + * ignored. + */ + String MAX_SINGLE_RESOURCE_MB = + "mapreduce.job.cache.limit.max-single-resource-mb"; + long MAX_SINGLE_RESOURCE_MB_DEFAULT = 0; + /** * Number of OPPORTUNISTIC Containers per 100 containers that will be * requested by the MRAppMaster. The Default value is 0, which implies all diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java index 19470e82848..73a0330396d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java @@ -54,10 +54,23 @@ public class ClientDistributedCacheManager { public static void determineTimestampsAndCacheVisibilities(Configuration job) throws IOException { Map statCache = new HashMap(); + determineTimestampsAndCacheVisibilities(job, statCache); + } + + /** + * See ClientDistributedCacheManager#determineTimestampsAndCacheVisibilities( + * Configuration). + * + * @param job Configuration of a job + * @param statCache A map containing cached file status objects + * @throws IOException if there is a problem with the underlying filesystem + */ + public static void determineTimestampsAndCacheVisibilities(Configuration job, + Map statCache) throws IOException { determineTimestamps(job, statCache); determineCacheVisibilities(job, statCache); } - + /** * Determines timestamps of files to be cached, and stores those * in the configuration. This is intended to be used internally by JobClient diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index b648e867283..4a8d6e211b5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1881,6 +1881,36 @@ GET,OPTIONS,HEAD + + mapreduce.job.cache.limit.max-resources + 0 + The maximum number of resources a map reduce job is allowed to + submit for localization via files, libjars, archives, and jobjar command + line arguments and through the distributed cache. If set to 0 the limit is + ignored. + + + + + mapreduce.job.cache.limit.max-resources-mb + 0 + The maximum size (in MB) a map reduce job is allowed to submit + for localization via files, libjars, archives, and jobjar command line + arguments and through the distributed cache. If set to 0 the limit is + ignored. + + + + + mapreduce.job.cache.limit.max-single-resource-mb + 0 + The maximum size (in MB) of a single resource a map reduce job + is allow to submit for localization via files, libjars, archives, and + jobjar command line arguments and through the distributed cache. If set to + 0 the limit is ignored. + + + Value of the xframe-options diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java new file mode 100644 index 00000000000..36ea57af21d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Assert; +import org.junit.Test; + +/** + * A class for unit testing JobResourceUploader. + */ +public class TestJobResourceUploader { + + @Test + public void testAllDefaults() throws IOException { + ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + runLimitsTest(b.build(), true, null); + } + + @Test + public void testNoLimitsWithResources() throws IOException { + ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + b.setNumOfDCArchives(1); + b.setNumOfDCFiles(1); + b.setNumOfTmpArchives(10); + b.setNumOfTmpFiles(1); + b.setNumOfTmpLibJars(1); + b.setJobJar(true); + b.setSizeOfResource(10); + runLimitsTest(b.build(), true, null); + } + + @Test + public void testAtResourceLimit() throws IOException { + ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + b.setNumOfDCArchives(1); + b.setNumOfDCFiles(1); + b.setNumOfTmpArchives(1); + b.setNumOfTmpFiles(1); + b.setNumOfTmpLibJars(1); + b.setJobJar(true); + b.setMaxResources(6); + runLimitsTest(b.build(), true, null); + } + + @Test + public void testOverResourceLimit() throws IOException { + ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + b.setNumOfDCArchives(1); + b.setNumOfDCFiles(1); + b.setNumOfTmpArchives(1); + b.setNumOfTmpFiles(2); + b.setNumOfTmpLibJars(1); + b.setJobJar(true); + b.setMaxResources(6); + runLimitsTest(b.build(), false, ResourceViolation.NUMBER_OF_RESOURCES); + } + + @Test + public void testAtResourcesMBLimit() throws IOException { + ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + b.setNumOfDCArchives(1); + b.setNumOfDCFiles(1); + b.setNumOfTmpArchives(1); + b.setNumOfTmpFiles(2); + b.setNumOfTmpLibJars(1); + b.setJobJar(true); + b.setMaxResourcesMB(7); + b.setSizeOfResource(1); + runLimitsTest(b.build(), true, null); + } + + @Test + public void testOverResourcesMBLimit() throws IOException { + ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + b.setNumOfDCArchives(1); + b.setNumOfDCFiles(2); + b.setNumOfTmpArchives(1); + b.setNumOfTmpFiles(2); + b.setNumOfTmpLibJars(1); + b.setJobJar(true); + b.setMaxResourcesMB(7); + b.setSizeOfResource(1); + runLimitsTest(b.build(), false, ResourceViolation.TOTAL_RESOURCE_SIZE); + } + + @Test + public void testAtSingleResourceMBLimit() throws IOException { + ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + b.setNumOfDCArchives(1); + b.setNumOfDCFiles(2); + b.setNumOfTmpArchives(1); + b.setNumOfTmpFiles(2); + b.setNumOfTmpLibJars(1); + b.setJobJar(true); + b.setMaxSingleResourceMB(1); + b.setSizeOfResource(1); + runLimitsTest(b.build(), true, null); + } + + @Test + public void testOverSingleResourceMBLimit() throws IOException { + ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + b.setNumOfDCArchives(1); + b.setNumOfDCFiles(2); + b.setNumOfTmpArchives(1); + b.setNumOfTmpFiles(2); + b.setNumOfTmpLibJars(1); + b.setJobJar(true); + b.setMaxSingleResourceMB(1); + b.setSizeOfResource(10); + runLimitsTest(b.build(), false, ResourceViolation.SINGLE_RESOURCE_SIZE); + } + + private enum ResourceViolation { + NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE; + } + + private void runLimitsTest(ResourceLimitsConf rlConf, + boolean checkShouldSucceed, ResourceViolation violation) + throws IOException { + + if (!checkShouldSucceed && violation == null) { + Assert.fail("Test is misconfigured. checkShouldSucceed is set to false" + + " and a ResourceViolation is not specified."); + } + + JobConf conf = setupJobConf(rlConf); + JobResourceUploader uploader = new StubedUploader(conf); + long configuredSizeOfResourceBytes = rlConf.sizeOfResource * 1024 * 1024; + when(mockedStatus.getLen()).thenReturn(configuredSizeOfResourceBytes); + when(mockedStatus.isDirectory()).thenReturn(false); + Map statCache = new HashMap(); + try { + uploader.checkLocalizationLimits(conf, + conf.getStringCollection("tmpfiles"), + conf.getStringCollection("tmpjars"), + conf.getStringCollection("tmparchives"), + conf.getJar(), statCache); + Assert.assertTrue("Limits check succeeded when it should have failed.", + checkShouldSucceed); + } catch (IOException e) { + if (checkShouldSucceed) { + Assert.fail("Limits check failed when it should have succeeded: " + e); + } + switch (violation) { + case NUMBER_OF_RESOURCES: + if (!e.getMessage().contains( + JobResourceUploader.MAX_RESOURCE_ERR_MSG)) { + Assert.fail("Test failed unexpectedly: " + e); + } + break; + + case TOTAL_RESOURCE_SIZE: + if (!e.getMessage().contains( + JobResourceUploader.MAX_TOTAL_RESOURCE_MB_ERR_MSG)) { + Assert.fail("Test failed unexpectedly: " + e); + } + break; + + case SINGLE_RESOURCE_SIZE: + if (!e.getMessage().contains( + JobResourceUploader.MAX_SINGLE_RESOURCE_MB_ERR_MSG)) { + Assert.fail("Test failed unexpectedly: " + e); + } + break; + + default: + Assert.fail("Test failed unexpectedly: " + e); + break; + } + } + } + + private final FileStatus mockedStatus = mock(FileStatus.class); + + private JobConf setupJobConf(ResourceLimitsConf rlConf) { + JobConf conf = new JobConf(); + conf.setInt(MRJobConfig.MAX_RESOURCES, rlConf.maxResources); + conf.setLong(MRJobConfig.MAX_RESOURCES_MB, rlConf.maxResourcesMB); + conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB, + rlConf.maxSingleResourceMB); + + conf.set("tmpfiles", + buildPathString("file://tmpFiles", rlConf.numOfTmpFiles)); + conf.set("tmpjars", + buildPathString("file://tmpjars", rlConf.numOfTmpLibJars)); + conf.set("tmparchives", + buildPathString("file://tmpArchives", rlConf.numOfTmpArchives)); + conf.set(MRJobConfig.CACHE_ARCHIVES, + buildPathString("file://cacheArchives", rlConf.numOfDCArchives)); + conf.set(MRJobConfig.CACHE_FILES, + buildPathString("file://cacheFiles", rlConf.numOfDCFiles)); + if (rlConf.jobJar) { + conf.setJar("file://jobjar.jar"); + } + return conf; + } + + private String buildPathString(String pathPrefix, int numOfPaths) { + if (numOfPaths < 1) { + return ""; + } else { + StringBuilder b = new StringBuilder(); + b.append(pathPrefix + 0); + for (int i = 1; i < numOfPaths; i++) { + b.append("," + pathPrefix + i); + } + return b.toString(); + } + } + + final static class ResourceLimitsConf { + private final int maxResources; + private final long maxResourcesMB; + private final long maxSingleResourceMB; + private final int numOfTmpFiles; + private final int numOfTmpArchives; + private final int numOfTmpLibJars; + private final boolean jobJar; + private final int numOfDCFiles; + private final int numOfDCArchives; + private final long sizeOfResource; + + static final ResourceLimitsConf DEFAULT = new ResourceLimitsConf(); + + private ResourceLimitsConf() { + this(new Builder()); + } + + private ResourceLimitsConf(Builder builder) { + this.maxResources = builder.maxResources; + this.maxResourcesMB = builder.maxResourcesMB; + this.maxSingleResourceMB = builder.maxSingleResourceMB; + this.numOfTmpFiles = builder.numOfTmpFiles; + this.numOfTmpArchives = builder.numOfTmpArchives; + this.numOfTmpLibJars = builder.numOfTmpLibJars; + this.jobJar = builder.jobJar; + this.numOfDCFiles = builder.numOfDCFiles; + this.numOfDCArchives = builder.numOfDCArchives; + this.sizeOfResource = builder.sizeOfResource; + } + + static class Builder { + // Defaults + private int maxResources = 0; + private long maxResourcesMB = 0; + private long maxSingleResourceMB = 0; + private int numOfTmpFiles = 0; + private int numOfTmpArchives = 0; + private int numOfTmpLibJars = 0; + private boolean jobJar = false; + private int numOfDCFiles = 0; + private int numOfDCArchives = 0; + private long sizeOfResource = 0; + + Builder() { + } + + Builder setMaxResources(int max) { + this.maxResources = max; + return this; + } + + Builder setMaxResourcesMB(long max) { + this.maxResourcesMB = max; + return this; + } + + Builder setMaxSingleResourceMB(long max) { + this.maxSingleResourceMB = max; + return this; + } + + Builder setNumOfTmpFiles(int num) { + this.numOfTmpFiles = num; + return this; + } + + Builder setNumOfTmpArchives(int num) { + this.numOfTmpArchives = num; + return this; + } + + Builder setNumOfTmpLibJars(int num) { + this.numOfTmpLibJars = num; + return this; + } + + Builder setJobJar(boolean jar) { + this.jobJar = jar; + return this; + } + + Builder setNumOfDCFiles(int num) { + this.numOfDCFiles = num; + return this; + } + + Builder setNumOfDCArchives(int num) { + this.numOfDCArchives = num; + return this; + } + + Builder setSizeOfResource(long sizeMB) { + this.sizeOfResource = sizeMB; + return this; + } + + ResourceLimitsConf build() { + return new ResourceLimitsConf(this); + } + } + } + + class StubedUploader extends JobResourceUploader { + StubedUploader(JobConf conf) throws IOException { + super(FileSystem.getLocal(conf), false); + } + + @Override + FileStatus getFileStatus(Map statCache, Configuration job, + Path p) throws IOException { + return mockedStatus; + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 451ec574464..32b3a42a2a6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -138,6 +138,8 @@ public class TestMRJobs { static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); private static final String OUTPUT_ROOT_DIR = "/tmp/" + TestMRJobs.class.getSimpleName(); + private static final Path TEST_RESOURCES_DIR = new Path(TEST_ROOT_DIR, + "localizedResources"); @BeforeClass public static void setup() throws IOException { @@ -173,7 +175,7 @@ public class TestMRJobs { } @AfterClass - public static void tearDown() { + public static void tearDown() throws IOException { if (mrCluster != null) { mrCluster.stop(); mrCluster = null; @@ -182,6 +184,10 @@ public class TestMRJobs { dfsCluster.shutdown(); dfsCluster = null; } + if (localFs.exists(TEST_RESOURCES_DIR)) { + // clean up resource directory + localFs.delete(TEST_RESOURCES_DIR, true); + } } @After @@ -189,6 +195,39 @@ public class TestMRJobs { numSleepReducers = DEFAULT_REDUCES; } + private static void setupJobResourceDirs() throws IOException { + if (localFs.exists(TEST_RESOURCES_DIR)) { + // clean up directory + localFs.delete(TEST_RESOURCES_DIR, true); + } + + localFs.mkdirs(TEST_RESOURCES_DIR); + FSDataOutputStream outF1 = null; + try { + // 10KB file + outF1 = localFs.create(new Path(TEST_RESOURCES_DIR, "file1.txt")); + outF1.write(new byte[10 * 1024]); + } finally { + if (outF1 != null) { + outF1.close(); + } + } + localFs.createNewFile(new Path(TEST_RESOURCES_DIR, "file2.txt")); + Path subDir = new Path(TEST_RESOURCES_DIR, "subDir"); + localFs.mkdirs(subDir); + FSDataOutputStream outF3 = null; + try { + // 1MB (plus 10 Bytes) file + outF3 = localFs.create(new Path(subDir, "file3.txt")); + outF3.write(new byte[(1 * 1024 * 1024) + 10]); + } finally { + if (outF3 != null) { + outF3.close(); + } + } + localFs.createNewFile(new Path(subDir, "file4.txt")); + } + @Test (timeout = 300000) public void testSleepJob() throws Exception { testSleepJobInternal(false); @@ -199,16 +238,99 @@ public class TestMRJobs { testSleepJobInternal(true); } + @Test(timeout = 300000) + public void testSleepJobWithLocalResourceUnderLimit() throws Exception { + Configuration sleepConf = new Configuration(mrCluster.getConfig()); + // set limits to well above what is expected + sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 6); + sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 6); + setupJobResourceDirs(); + sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); + testSleepJobInternal(sleepConf, false, true, null); + } + + @Test(timeout = 300000) + public void testSleepJobWithLocalResourceSizeOverLimit() throws Exception { + Configuration sleepConf = new Configuration(mrCluster.getConfig()); + // set limits to well below what is expected + sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 1); + setupJobResourceDirs(); + sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); + testSleepJobInternal(sleepConf, false, false, + ResourceViolation.TOTAL_RESOURCE_SIZE); + } + + @Test(timeout = 300000) + public void testSleepJobWithLocalResourceNumberOverLimit() throws Exception { + Configuration sleepConf = new Configuration(mrCluster.getConfig()); + // set limits to well below what is expected + sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 1); + setupJobResourceDirs(); + sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); + testSleepJobInternal(sleepConf, false, false, + ResourceViolation.NUMBER_OF_RESOURCES); + } + + @Test(timeout = 300000) + public void testSleepJobWithLocalResourceCheckAndRemoteJar() + throws Exception { + Configuration sleepConf = new Configuration(mrCluster.getConfig()); + // set limits to well above what is expected + sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 6); + sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 6); + setupJobResourceDirs(); + sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); + testSleepJobInternal(sleepConf, true, true, null); + } + + @Test(timeout = 300000) + public void testSleepJobWithLocalIndividualResourceOverLimit() + throws Exception { + Configuration sleepConf = new Configuration(mrCluster.getConfig()); + // set limits to well below what is expected + sleepConf.setInt(MRJobConfig.MAX_SINGLE_RESOURCE_MB, 1); + setupJobResourceDirs(); + sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); + testSleepJobInternal(sleepConf, false, false, + ResourceViolation.SINGLE_RESOURCE_SIZE); + } + + @Test(timeout = 300000) + public void testSleepJobWithLocalIndividualResourceUnderLimit() + throws Exception { + Configuration sleepConf = new Configuration(mrCluster.getConfig()); + // set limits to well below what is expected + sleepConf.setInt(MRJobConfig.MAX_SINGLE_RESOURCE_MB, 2); + setupJobResourceDirs(); + sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); + testSleepJobInternal(sleepConf, false, true, null); + } + private void testSleepJobInternal(boolean useRemoteJar) throws Exception { + testSleepJobInternal(new Configuration(mrCluster.getConfig()), + useRemoteJar, true, null); + } + + private enum ResourceViolation { + NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE; + } + + private void testSleepJobInternal(Configuration sleepConf, + boolean useRemoteJar, boolean jobSubmissionShouldSucceed, + ResourceViolation violation) throws Exception { LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar); + if (!jobSubmissionShouldSucceed && violation == null) { + Assert.fail("Test is misconfigured. jobSubmissionShouldSucceed is set" + + " to false and a ResourceViolation is not specified."); + } + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); return; } - Configuration sleepConf = new Configuration(mrCluster.getConfig()); // set master address to local to test that local mode applied iff framework == local sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); @@ -229,7 +351,45 @@ public class TestMRJobs { job.setJarByClass(SleepJob.class); } job.setMaxMapAttempts(1); // speed up failures - job.submit(); + try { + job.submit(); + Assert.assertTrue("JobSubmission succeeded when it should have failed.", + jobSubmissionShouldSucceed); + } catch (IOException e) { + if (jobSubmissionShouldSucceed) { + Assert + .fail("Job submission failed when it should have succeeded: " + e); + } + switch (violation) { + case NUMBER_OF_RESOURCES: + if (!e.getMessage().contains( + "This job has exceeded the maximum number of" + + " submitted resources")) { + Assert.fail("Test failed unexpectedly: " + e); + } + break; + + case TOTAL_RESOURCE_SIZE: + if (!e.getMessage().contains( + "This job has exceeded the maximum size of submitted resources")) { + Assert.fail("Test failed unexpectedly: " + e); + } + break; + + case SINGLE_RESOURCE_SIZE: + if (!e.getMessage().contains( + "This job has exceeded the maximum size of a single submitted")) { + Assert.fail("Test failed unexpectedly: " + e); + } + break; + + default: + Assert.fail("Test failed unexpectedly: " + e); + break; + } + // we are done with the test (job submission failed) + return; + } String trackingUrl = job.getTrackingURL(); String jobId = job.getJobID().toString(); boolean succeeded = job.waitForCompletion(true);