diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
index a23827d31f1..ff88bccc27b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
@@ -42,6 +42,10 @@
org.apache.hadoop
hadoop-yarn-common
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+
org.apache.hadoop
hadoop-hdfs
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
index f1cad57dd41..d9bf988f9b8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -94,6 +96,11 @@ class JobResourceUploader {
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
mkdirs(jtFs, submitJobDir, mapredSysPerms);
+ if (!conf.getBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED,
+ MRJobConfig.DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED)) {
+ disableErasureCodingForPath(jtFs, submitJobDir);
+ }
+
Collection files = conf.getStringCollection("tmpfiles");
Collection libjars = conf.getStringCollection("tmpjars");
Collection archives = conf.getStringCollection("tmparchives");
@@ -575,4 +582,14 @@ class JobResourceUploader {
}
return finalPath;
}
+
+ private void disableErasureCodingForPath(FileSystem fs, Path path)
+ throws IOException {
+ if (jtFs instanceof DistributedFileSystem) {
+ LOG.info("Disabling Erasure Coding for path: " + path);
+ DistributedFileSystem dfs = (DistributedFileSystem) jtFs;
+ dfs.setErasureCodingPolicy(path,
+ SystemErasureCodingPolicies.getReplicationPolicy().getName());
+ }
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 2023ba3b1d2..86abb42983a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -1037,4 +1037,9 @@ public interface MRJobConfig {
String FINISH_JOB_WHEN_REDUCERS_DONE =
"mapreduce.job.finish-when-all-reducers-done";
boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true;
+
+ String MR_AM_STAGING_DIR_ERASURECODING_ENABLED =
+ MR_AM_STAGING_DIR + "erasurecoding.enabled";
+
+ boolean DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED = false;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index ee9b906faa9..6b6faf20329 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1260,6 +1260,15 @@
+
+ yarn.app.mapreduce.am.staging-dir.erasurecoding.enabled
+ false
+ Whether Erasure Coding should be enabled for
+ files that are copied to the MR staging area. This is a job-level
+ setting.
+
+
+
mapreduce.am.max-attempts
2
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
index 20b7b7dcfba..d0d7a349323 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.mapreduce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import java.io.IOException;
import java.net.URI;
@@ -36,9 +41,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.verification.VerificationMode;
/**
* A class for unit testing JobResourceUploader.
@@ -357,6 +365,40 @@ public class TestJobResourceUploader {
expectedArchivesWithFrags, expectedJobJar);
}
+ @Test
+ public void testErasureCodingDefault() throws IOException {
+ testErasureCodingSetting(true);
+ }
+
+ @Test
+ public void testErasureCodingDisabled() throws IOException {
+ testErasureCodingSetting(false);
+ }
+
+ private void testErasureCodingSetting(boolean defaultBehavior)
+ throws IOException {
+ JobConf jConf = new JobConf();
+ // don't set to false if EC remains disabled to check default setting
+ if (!defaultBehavior) {
+ jConf.setBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED,
+ true);
+ }
+
+ DistributedFileSystem fs = mock(DistributedFileSystem.class);
+ Path path = new Path("/");
+ when(fs.makeQualified(any(Path.class))).thenReturn(path);
+ JobResourceUploader uploader = new StubedUploader(fs, true);
+ Job job = Job.getInstance(jConf);
+
+ uploader.uploadResources(job, new Path("/test"));
+
+ String replicationPolicyName = SystemErasureCodingPolicies
+ .getReplicationPolicy().getName();
+ VerificationMode mode = defaultBehavior ? times(1) : never();
+ verify(fs, mode).setErasureCodingPolicy(eq(path),
+ eq(replicationPolicyName));
+ }
+
private void runTmpResourcePathTest(JobResourceUploader uploader,
ResourceConf rConf, JobConf jConf, String[] expectedFiles,
String[] expectedArchives, String expectedJobJar) throws IOException {
@@ -698,6 +740,10 @@ public class TestJobResourceUploader {
super(FileSystem.getLocal(conf), useWildcard);
}
+ StubedUploader(FileSystem fs, boolean useWildcard) throws IOException {
+ super(fs, useWildcard);
+ }
+
@Override
FileStatus getFileStatus(Map statCache, Configuration job,
Path p) throws IOException {