From 0adc0471d0c06f66a31060f270dcb50a7b4ffafa Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Mon, 18 Sep 2017 10:40:06 -0700 Subject: [PATCH] MAPREDUCE-6954. Disable erasure coding for files that are uploaded to the MR staging area (pbacsko via rkanter) --- .../hadoop-mapreduce-client-core/pom.xml | 4 ++ .../hadoop/mapreduce/JobResourceUploader.java | 17 +++++++ .../apache/hadoop/mapreduce/MRJobConfig.java | 5 ++ .../src/main/resources/mapred-default.xml | 9 ++++ .../mapreduce/TestJobResourceUploader.java | 46 +++++++++++++++++++ 5 files changed, 81 insertions(+) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index c34f7bd32f8..ce5fdc86111 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -42,6 +42,10 @@ org.apache.hadoop hadoop-yarn-common + + org.apache.hadoop + hadoop-hdfs-client + org.apache.hadoop hadoop-hdfs diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index f1cad57dd41..d9bf988f9b8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.DistributedCache; @@ -94,6 +96,11 @@ class JobResourceUploader { new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); mkdirs(jtFs, submitJobDir, mapredSysPerms); + if (!conf.getBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED, + MRJobConfig.DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED)) { + disableErasureCodingForPath(jtFs, submitJobDir); + } + Collection files = conf.getStringCollection("tmpfiles"); Collection libjars = conf.getStringCollection("tmpjars"); Collection archives = conf.getStringCollection("tmparchives"); @@ -575,4 +582,14 @@ class JobResourceUploader { } return finalPath; } + + private void disableErasureCodingForPath(FileSystem fs, Path path) + throws IOException { + if (jtFs instanceof DistributedFileSystem) { + LOG.info("Disabling Erasure Coding for path: " + path); + DistributedFileSystem dfs = (DistributedFileSystem) jtFs; + dfs.setErasureCodingPolicy(path, + SystemErasureCodingPolicies.getReplicationPolicy().getName()); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 2023ba3b1d2..86abb42983a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -1037,4 +1037,9 @@ public interface MRJobConfig { String FINISH_JOB_WHEN_REDUCERS_DONE = "mapreduce.job.finish-when-all-reducers-done"; boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true; + + String MR_AM_STAGING_DIR_ERASURECODING_ENABLED = + MR_AM_STAGING_DIR + "erasurecoding.enabled"; + + boolean DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED = false; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index ee9b906faa9..6b6faf20329 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1260,6 +1260,15 @@ + + yarn.app.mapreduce.am.staging-dir.erasurecoding.enabled + false + Whether Erasure Coding should be enabled for + files that are copied to the MR staging area. This is a job-level + setting. + + + mapreduce.am.max-attempts 2 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java index 20b7b7dcfba..d0d7a349323 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java @@ -20,6 +20,11 @@ package org.apache.hadoop.mapreduce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import java.io.IOException; import java.net.URI; @@ -36,9 +41,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.mapred.JobConf; import org.junit.Assert; import org.junit.Test; +import org.mockito.verification.VerificationMode; /** * A class for unit testing JobResourceUploader. @@ -357,6 +365,40 @@ public class TestJobResourceUploader { expectedArchivesWithFrags, expectedJobJar); } + @Test + public void testErasureCodingDefault() throws IOException { + testErasureCodingSetting(true); + } + + @Test + public void testErasureCodingDisabled() throws IOException { + testErasureCodingSetting(false); + } + + private void testErasureCodingSetting(boolean defaultBehavior) + throws IOException { + JobConf jConf = new JobConf(); + // don't set to false if EC remains disabled to check default setting + if (!defaultBehavior) { + jConf.setBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED, + true); + } + + DistributedFileSystem fs = mock(DistributedFileSystem.class); + Path path = new Path("/"); + when(fs.makeQualified(any(Path.class))).thenReturn(path); + JobResourceUploader uploader = new StubedUploader(fs, true); + Job job = Job.getInstance(jConf); + + uploader.uploadResources(job, new Path("/test")); + + String replicationPolicyName = SystemErasureCodingPolicies + .getReplicationPolicy().getName(); + VerificationMode mode = defaultBehavior ? times(1) : never(); + verify(fs, mode).setErasureCodingPolicy(eq(path), + eq(replicationPolicyName)); + } + private void runTmpResourcePathTest(JobResourceUploader uploader, ResourceConf rConf, JobConf jConf, String[] expectedFiles, String[] expectedArchives, String expectedJobJar) throws IOException { @@ -698,6 +740,10 @@ public class TestJobResourceUploader { super(FileSystem.getLocal(conf), useWildcard); } + StubedUploader(FileSystem fs, boolean useWildcard) throws IOException { + super(fs, useWildcard); + } + @Override FileStatus getFileStatus(Map statCache, Configuration job, Path p) throws IOException {