MAPREDUCE-6954. Disable erasure coding for files that are uploaded to the MR staging area (pbacsko via rkanter)

This commit is contained in:
Robert Kanter 2017-09-18 10:40:06 -07:00
parent 5f496683fb
commit 0adc0471d0
5 changed files with 81 additions and 0 deletions

View File

@ -42,6 +42,10 @@
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId> <artifactId>hadoop-yarn-common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId> <artifactId>hadoop-hdfs</artifactId>

View File

@ -36,6 +36,8 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@ -94,6 +96,11 @@ public void uploadResources(Job job, Path submitJobDir) throws IOException {
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
mkdirs(jtFs, submitJobDir, mapredSysPerms); mkdirs(jtFs, submitJobDir, mapredSysPerms);
if (!conf.getBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED,
MRJobConfig.DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED)) {
disableErasureCodingForPath(jtFs, submitJobDir);
}
Collection<String> files = conf.getStringCollection("tmpfiles"); Collection<String> files = conf.getStringCollection("tmpfiles");
Collection<String> libjars = conf.getStringCollection("tmpjars"); Collection<String> libjars = conf.getStringCollection("tmpjars");
Collection<String> archives = conf.getStringCollection("tmparchives"); Collection<String> archives = conf.getStringCollection("tmparchives");
@ -575,4 +582,14 @@ private String validateFilePath(String file, Configuration conf)
} }
return finalPath; return finalPath;
} }
private void disableErasureCodingForPath(FileSystem fs, Path path)
throws IOException {
if (jtFs instanceof DistributedFileSystem) {
LOG.info("Disabling Erasure Coding for path: " + path);
DistributedFileSystem dfs = (DistributedFileSystem) jtFs;
dfs.setErasureCodingPolicy(path,
SystemErasureCodingPolicies.getReplicationPolicy().getName());
}
}
} }

View File

@ -1037,4 +1037,9 @@ public interface MRJobConfig {
String FINISH_JOB_WHEN_REDUCERS_DONE = String FINISH_JOB_WHEN_REDUCERS_DONE =
"mapreduce.job.finish-when-all-reducers-done"; "mapreduce.job.finish-when-all-reducers-done";
boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true; boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true;
String MR_AM_STAGING_DIR_ERASURECODING_ENABLED =
MR_AM_STAGING_DIR + "erasurecoding.enabled";
boolean DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED = false;
} }

View File

@ -1260,6 +1260,15 @@
</description> </description>
</property> </property>
<property>
<name>yarn.app.mapreduce.am.staging-dir.erasurecoding.enabled</name>
<value>false</value>
<description>Whether Erasure Coding should be enabled for
files that are copied to the MR staging area. This is a job-level
setting.
</description>
</property>
<property> <property>
<name>mapreduce.am.max-attempts</name> <name>mapreduce.am.max-attempts</name>
<value>2</value> <value>2</value>

View File

@ -20,6 +20,11 @@
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
@ -36,9 +41,12 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.verification.VerificationMode;
/** /**
* A class for unit testing JobResourceUploader. * A class for unit testing JobResourceUploader.
@ -357,6 +365,40 @@ public void testPathsWithFragsAndWildCard() throws IOException {
expectedArchivesWithFrags, expectedJobJar); expectedArchivesWithFrags, expectedJobJar);
} }
@Test
public void testErasureCodingDefault() throws IOException {
testErasureCodingSetting(true);
}
@Test
public void testErasureCodingDisabled() throws IOException {
testErasureCodingSetting(false);
}
private void testErasureCodingSetting(boolean defaultBehavior)
throws IOException {
JobConf jConf = new JobConf();
// don't set to false if EC remains disabled to check default setting
if (!defaultBehavior) {
jConf.setBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED,
true);
}
DistributedFileSystem fs = mock(DistributedFileSystem.class);
Path path = new Path("/");
when(fs.makeQualified(any(Path.class))).thenReturn(path);
JobResourceUploader uploader = new StubedUploader(fs, true);
Job job = Job.getInstance(jConf);
uploader.uploadResources(job, new Path("/test"));
String replicationPolicyName = SystemErasureCodingPolicies
.getReplicationPolicy().getName();
VerificationMode mode = defaultBehavior ? times(1) : never();
verify(fs, mode).setErasureCodingPolicy(eq(path),
eq(replicationPolicyName));
}
private void runTmpResourcePathTest(JobResourceUploader uploader, private void runTmpResourcePathTest(JobResourceUploader uploader,
ResourceConf rConf, JobConf jConf, String[] expectedFiles, ResourceConf rConf, JobConf jConf, String[] expectedFiles,
String[] expectedArchives, String expectedJobJar) throws IOException { String[] expectedArchives, String expectedJobJar) throws IOException {
@ -698,6 +740,10 @@ private class StubedUploader extends JobResourceUploader {
super(FileSystem.getLocal(conf), useWildcard); super(FileSystem.getLocal(conf), useWildcard);
} }
StubedUploader(FileSystem fs, boolean useWildcard) throws IOException {
super(fs, useWildcard);
}
@Override @Override
FileStatus getFileStatus(Map<URI, FileStatus> statCache, Configuration job, FileStatus getFileStatus(Map<URI, FileStatus> statCache, Configuration job,
Path p) throws IOException { Path p) throws IOException {