MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)

Contributed by Zhenzhao Wang <zhenzhaowang@gmail.com>

Signed-off-by: Mingliang Liu <liuml07@apache.org>
This commit is contained in:
zz 2020-09-19 23:10:05 -07:00 committed by Mingliang Liu
parent 43b113de69
commit 06ff4d1416
No known key found for this signature in database
GPG Key ID: BC2FB8C6908A0C16
3 changed files with 39 additions and 20 deletions

View File

@ -1423,7 +1423,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
* be set up to false. In that way, the NMs that host the task containers * be set up to false. In that way, the NMs that host the task containers
* won't try to upload the resources to shared cache. * won't try to upload the resources to shared cache.
*/ */
private static void cleanupSharedCacheUploadPolicies(Configuration conf) { @VisibleForTesting
static void cleanupSharedCacheUploadPolicies(Configuration conf) {
Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap()); Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap()); Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
} }

View File

@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
@ -991,6 +992,28 @@ public class TestJobImpl {
Assert.assertEquals(updatedPriority, jobPriority); Assert.assertEquals(updatedPriority, jobPriority);
} }
@Test
public void testCleanupSharedCacheUploadPolicies() {
Configuration config = new Configuration();
Map<String, Boolean> archivePolicies = new HashMap<>();
archivePolicies.put("archive1", true);
archivePolicies.put("archive2", true);
Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies);
Map<String, Boolean> filePolicies = new HashMap<>();
filePolicies.put("file1", true);
filePolicies.put("jar1", true);
Job.setFileSharedCacheUploadPolicies(config, filePolicies);
Assert.assertEquals(
2, Job.getArchiveSharedCacheUploadPolicies(config).size());
Assert.assertEquals(
2, Job.getFileSharedCacheUploadPolicies(config).size());
JobImpl.cleanupSharedCacheUploadPolicies(config);
Assert.assertEquals(
0, Job.getArchiveSharedCacheUploadPolicies(config).size());
Assert.assertEquals(
0, Job.getFileSharedCacheUploadPolicies(config).size());
}
private static CommitterEventHandler createCommitterEventHandler( private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) { Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = SystemClock.getInstance(); final SystemClock clock = SystemClock.getInstance();

View File

@ -1448,27 +1448,22 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
*/ */
private static void setSharedCacheUploadPolicies(Configuration conf, private static void setSharedCacheUploadPolicies(Configuration conf,
Map<String, Boolean> policies, boolean areFiles) { Map<String, Boolean> policies, boolean areFiles) {
if (policies != null) { String confParam = areFiles ?
StringBuilder sb = new StringBuilder(); MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator(); MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
Map.Entry<String, Boolean> e; // If no policy is provided, we will reset the config by setting an empty
if (it.hasNext()) { // string value. In other words, cleaning up existing policies. This is
e = it.next(); // useful when we try to clean up shared cache upload policies for
sb.append(e.getKey() + DELIM + e.getValue()); // non-application master tasks. See MAPREDUCE-7294 for details.
} else { if (policies == null || policies.size() == 0) {
// policies is an empty map, just skip setting the parameter conf.set(confParam, "");
return; return;
} }
while (it.hasNext()) { StringBuilder sb = new StringBuilder();
e = it.next(); policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(","));
sb.append("," + e.getKey() + DELIM + e.getValue()); sb.deleteCharAt(sb.length() - 1);
}
String confParam =
areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
: MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
conf.set(confParam, sb.toString()); conf.set(confParam, sb.toString());
} }
}
/** /**
* Deserialize a map of shared cache upload policies from a config parameter. * Deserialize a map of shared cache upload policies from a config parameter.