MAPREDUCE-6095. Enable DistributedCache for uber-mode Jobs. Contributed by Gera Shegalov

(cherry picked from commit 7039b98e1c)
This commit is contained in:
Jason Lowe 2014-09-22 15:20:59 +00:00
parent c1a3819a4d
commit f4534746fb
5 changed files with 62 additions and 73 deletions

View File

@ -143,6 +143,9 @@ Release 2.6.0 - UNRELEASED
ApplicationNotFoundException if the job rolled off the RM view (Sangjin ApplicationNotFoundException if the job rolled off the RM view (Sangjin
Lee via jlowe) Lee via jlowe)
MAPREDUCE-6095. Enable DistributedCache for uber-mode Jobs (Gera Shegalov
via jlowe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,15 +23,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalDirAllocator;
@ -43,7 +39,6 @@ import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@ -301,7 +296,7 @@ class YarnChild {
task.localizeConfiguration(job); task.localizeConfiguration(job);
// Set up the DistributedCache related configs // Set up the DistributedCache related configs
setupDistributedCacheConfig(job); MRApps.setupDistributedCacheLocal(job);
// Overwrite the localized task jobconf which is linked to in the current // Overwrite the localized task jobconf which is linked to in the current
// work-dir. // work-dir.
@ -311,62 +306,6 @@ class YarnChild {
task.setConf(job); task.setConf(job);
} }
/**
* Set up the DistributedCache related configs to make
* {@link DistributedCache#getLocalCacheFiles(Configuration)}
* and
* {@link DistributedCache#getLocalCacheArchives(Configuration)}
* working.
* @param job
* @throws IOException
*/
private static void setupDistributedCacheConfig(final JobConf job)
throws IOException {
String localWorkDir = System.getenv("PWD");
// ^ ^ all symlinks are created in the current work-dir
// Update the configuration object with localized archives.
URI[] cacheArchives = DistributedCache.getCacheArchives(job);
if (cacheArchives != null) {
List<String> localArchives = new ArrayList<String>();
for (int i = 0; i < cacheArchives.length; ++i) {
URI u = cacheArchives[i];
Path p = new Path(u);
Path name =
new Path((null == u.getFragment()) ? p.getName()
: u.getFragment());
String linkName = name.toUri().getPath();
localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
}
if (!localArchives.isEmpty()) {
job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
.arrayToString(localArchives.toArray(new String[localArchives
.size()])));
}
}
// Update the configuration object with localized files.
URI[] cacheFiles = DistributedCache.getCacheFiles(job);
if (cacheFiles != null) {
List<String> localFiles = new ArrayList<String>();
for (int i = 0; i < cacheFiles.length; ++i) {
URI u = cacheFiles[i];
Path p = new Path(u);
Path name =
new Path((null == u.getFragment()) ? p.getName()
: u.getFragment());
String linkName = name.toUri().getPath();
localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
}
if (!localFiles.isEmpty()) {
job.set(MRJobConfig.CACHE_LOCALFILES,
StringUtils.arrayToString(localFiles
.toArray(new String[localFiles.size()])));
}
}
}
private static final FsPermission urw_gr = private static final FsPermission urw_gr =
FsPermission.createImmutable((short) 0640); FsPermission.createImmutable((short) 0640);

View File

@ -807,6 +807,7 @@ public class MRAppMaster extends CompositeService {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
if (job.isUber()) { if (job.isUber()) {
MRApps.setupDistributedCacheLocal(getConfig());
this.containerAllocator = new LocalContainerAllocator( this.containerAllocator = new LocalContainerAllocator(
this.clientService, this.context, nmHost, nmPort, nmHttpPort this.clientService, this.context, nmHost, nmPort, nmHttpPort
, containerID); , containerID);

View File

@ -26,12 +26,11 @@ import java.net.URISyntaxException;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedActionException; import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -58,8 +57,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.util.ApplicationClassLoader; import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.ContainerLogAppender; import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
@ -469,6 +466,62 @@ public class MRApps extends Apps {
DistributedCache.getFileVisibilities(conf)); DistributedCache.getFileVisibilities(conf));
} }
/**
* Set up the DistributedCache related configs to make
* {@link DistributedCache#getLocalCacheFiles(Configuration)}
* and
* {@link DistributedCache#getLocalCacheArchives(Configuration)}
* working.
* @param conf
* @throws java.io.IOException
*/
public static void setupDistributedCacheLocal(Configuration conf)
throws IOException {
String localWorkDir = System.getenv("PWD");
// ^ ^ all symlinks are created in the current work-dir
// Update the configuration object with localized archives.
URI[] cacheArchives = DistributedCache.getCacheArchives(conf);
if (cacheArchives != null) {
List<String> localArchives = new ArrayList<String>();
for (int i = 0; i < cacheArchives.length; ++i) {
URI u = cacheArchives[i];
Path p = new Path(u);
Path name =
new Path((null == u.getFragment()) ? p.getName()
: u.getFragment());
String linkName = name.toUri().getPath();
localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
}
if (!localArchives.isEmpty()) {
conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
.arrayToString(localArchives.toArray(new String[localArchives
.size()])));
}
}
// Update the configuration object with localized files.
URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
if (cacheFiles != null) {
List<String> localFiles = new ArrayList<String>();
for (int i = 0; i < cacheFiles.length; ++i) {
URI u = cacheFiles[i];
Path p = new Path(u);
Path name =
new Path((null == u.getFragment()) ? p.getName()
: u.getFragment());
String linkName = name.toUri().getPath();
localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
}
if (!localFiles.isEmpty()) {
conf.set(MRJobConfig.CACHE_LOCALFILES,
StringUtils.arrayToString(localFiles
.toArray(new String[localFiles.size()])));
}
}
}
private static String getResourceDescription(LocalResourceType type) { private static String getResourceDescription(LocalResourceType type) {
if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) { if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") "; return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";

View File

@ -191,11 +191,4 @@ public class TestUberAM extends TestMRJobs {
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
super.testSleepJobWithSecurityOn(); super.testSleepJobWithSecurityOn();
} }
// Add a test for distcache when uber mode is enabled. TODO
@Override
@Test
public void testDistributedCache() throws Exception {
//
}
} }