MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)

Contributed by Zhenzhao Wang <zhenzhaowang@gmail.com>

Signed-off-by: Mingliang Liu <liuml07@apache.org>
This commit is contained in:
zz 2020-09-19 23:10:05 -07:00 committed by Mingliang Liu
parent 75bc54a66d
commit e5e91397de
No known key found for this signature in database
GPG Key ID: BC2FB8C6908A0C16
3 changed files with 39 additions and 20 deletions

View File

@ -1423,7 +1423,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
* be set up to false. In that way, the NMs that host the task containers
* won't try to upload the resources to shared cache.
*/
private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
@VisibleForTesting
static void cleanupSharedCacheUploadPolicies(Configuration conf) {
Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
}

View File

@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@ -991,6 +992,28 @@ public class TestJobImpl {
Assert.assertEquals(updatedPriority, jobPriority);
}
@Test
public void testCleanupSharedCacheUploadPolicies() {
Configuration config = new Configuration();
Map<String, Boolean> archivePolicies = new HashMap<>();
archivePolicies.put("archive1", true);
archivePolicies.put("archive2", true);
Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies);
Map<String, Boolean> filePolicies = new HashMap<>();
filePolicies.put("file1", true);
filePolicies.put("jar1", true);
Job.setFileSharedCacheUploadPolicies(config, filePolicies);
Assert.assertEquals(
2, Job.getArchiveSharedCacheUploadPolicies(config).size());
Assert.assertEquals(
2, Job.getFileSharedCacheUploadPolicies(config).size());
JobImpl.cleanupSharedCacheUploadPolicies(config);
Assert.assertEquals(
0, Job.getArchiveSharedCacheUploadPolicies(config).size());
Assert.assertEquals(
0, Job.getFileSharedCacheUploadPolicies(config).size());
}
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = SystemClock.getInstance();

View File

@ -1450,26 +1450,21 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
*/
private static void setSharedCacheUploadPolicies(Configuration conf,
Map<String, Boolean> policies, boolean areFiles) {
if (policies != null) {
StringBuilder sb = new StringBuilder();
Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
Map.Entry<String, Boolean> e;
if (it.hasNext()) {
e = it.next();
sb.append(e.getKey() + DELIM + e.getValue());
} else {
// policies is an empty map, just skip setting the parameter
return;
}
while (it.hasNext()) {
e = it.next();
sb.append("," + e.getKey() + DELIM + e.getValue());
}
String confParam =
areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
: MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
conf.set(confParam, sb.toString());
String confParam = areFiles ?
MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
// If no policy is provided, we will reset the config by setting an empty
// string value. In other words, cleaning up existing policies. This is
// useful when we try to clean up shared cache upload policies for
// non-application master tasks. See MAPREDUCE-7294 for details.
if (policies == null || policies.size() == 0) {
conf.set(confParam, "");
return;
}
StringBuilder sb = new StringBuilder();
policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(","));
sb.deleteCharAt(sb.length() - 1);
conf.set(confParam, sb.toString());
}
/**