From e5e91397de906bef9091ae4625dc71de301a7d25 Mon Sep 17 00:00:00 2001 From: zz Date: Sat, 19 Sep 2020 23:10:05 -0700 Subject: [PATCH] MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223) Contributed by Zhenzhao Wang Signed-off-by: Mingliang Liu --- .../mapreduce/v2/app/job/impl/JobImpl.java | 3 +- .../v2/app/job/impl/TestJobImpl.java | 23 +++++++++++++ .../java/org/apache/hadoop/mapreduce/Job.java | 33 ++++++++----------- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index d2e2492be6c..59320b28057 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1423,7 +1423,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, * be set up to false. In that way, the NMs that host the task containers * won't try to upload the resources to shared cache. */ - private static void cleanupSharedCacheUploadPolicies(Configuration conf) { + @VisibleForTesting + static void cleanupSharedCacheUploadPolicies(Configuration conf) { Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap()); Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 945b2543919..43e59a7b345 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -991,6 +992,28 @@ public class TestJobImpl { Assert.assertEquals(updatedPriority, jobPriority); } + @Test + public void testCleanupSharedCacheUploadPolicies() { + Configuration config = new Configuration(); + Map archivePolicies = new HashMap<>(); + archivePolicies.put("archive1", true); + archivePolicies.put("archive2", true); + Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies); + Map filePolicies = new HashMap<>(); + filePolicies.put("file1", true); + filePolicies.put("jar1", true); + Job.setFileSharedCacheUploadPolicies(config, filePolicies); + Assert.assertEquals( + 2, Job.getArchiveSharedCacheUploadPolicies(config).size()); + Assert.assertEquals( + 2, Job.getFileSharedCacheUploadPolicies(config).size()); + JobImpl.cleanupSharedCacheUploadPolicies(config); + Assert.assertEquals( + 0, Job.getArchiveSharedCacheUploadPolicies(config).size()); + Assert.assertEquals( + 0, Job.getFileSharedCacheUploadPolicies(config).size()); + } + private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { final SystemClock clock = SystemClock.getInstance(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 31e2057e8df..9a998dacd98 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -1450,26 +1450,21 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable { */ private static void setSharedCacheUploadPolicies(Configuration conf, Map policies, boolean areFiles) { - if (policies != null) { - StringBuilder sb = new StringBuilder(); - Iterator> it = policies.entrySet().iterator(); - Map.Entry e; - if (it.hasNext()) { - e = it.next(); - sb.append(e.getKey() + DELIM + e.getValue()); - } else { - // policies is an empty map, just skip setting the parameter - return; - } - while (it.hasNext()) { - e = it.next(); - sb.append("," + e.getKey() + DELIM + e.getValue()); - } - String confParam = - areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES - : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; - conf.set(confParam, sb.toString()); + String confParam = areFiles ? + MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES : + MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; + // If no policy is provided, we will reset the config by setting an empty + // string value. In other words, cleaning up existing policies. This is + // useful when we try to clean up shared cache upload policies for + // non-application master tasks. See MAPREDUCE-7294 for details. + if (policies == null || policies.size() == 0) { + conf.set(confParam, ""); + return; } + StringBuilder sb = new StringBuilder(); + policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(",")); + sb.deleteCharAt(sb.length() - 1); + conf.set(confParam, sb.toString()); } /**