MAPREDUCE-6719. The list of -libjars archives should be replaced with a wildcard in the distributed cache to reduce the application footprint in the state store (Daniel Templeton via sjlee)

This commit is contained in:
Sangjin Lee 2016-06-21 11:25:11 -07:00
parent e15cd43369
commit 605b4b6136
12 changed files with 511 additions and 88 deletions

View File

@ -300,12 +300,36 @@ public class MRApps extends Apps {
for (URI u: withLinks) {
Path p = new Path(u);
FileSystem remoteFS = p.getFileSystem(conf);
String name = p.getName();
String wildcard = null;
// If the path is wildcarded, resolve its parent directory instead
if (name.equals(DistributedCache.WILDCARD)) {
wildcard = name;
p = p.getParent();
}
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
String name = (null == u.getFragment())
? p.getName() : u.getFragment();
if ((wildcard != null) && (u.getFragment() != null)) {
throw new IOException("Invalid path URI: " + p + " - cannot "
+ "contain both a URI fragment and a wildcard");
} else if (wildcard != null) {
name = p.getName() + Path.SEPARATOR + wildcard;
} else if (u.getFragment() != null) {
name = u.getFragment();
}
// If it's not a JAR, add it to the link lookup.
if (!StringUtils.toLowerCase(name).endsWith(".jar")) {
linkLookup.put(p, name);
String old = linkLookup.put(p, name);
if ((old != null) && !name.equals(old)) {
LOG.warn("The same path is included more than once "
+ "with different links or wildcards: " + p + " [" +
name + ", " + old + "]");
}
}
}
}
@ -559,16 +583,42 @@ public class MRApps extends Apps {
URI u = uris[i];
Path p = new Path(u);
FileSystem remoteFS = p.getFileSystem(conf);
String linkName = null;
if (p.getName().equals(DistributedCache.WILDCARD)) {
p = p.getParent();
linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
}
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
// Add URI fragment or just the filename
Path name = new Path((null == u.getFragment())
? p.getName()
: u.getFragment());
if (name.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be relative");
// If there's no wildcard, try using the fragment for the link
if (linkName == null) {
linkName = u.getFragment();
// Because we don't know what's in the fragment, we have to handle
// it with care.
if (linkName != null) {
Path linkPath = new Path(linkName);
if (linkPath.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be "
+ "relative");
}
linkName = linkPath.toUri().getPath();
}
} else if (u.getFragment() != null) {
throw new IllegalArgumentException("Invalid path URI: " + p +
" - cannot contain both a URI fragment and a wildcard");
}
String linkName = name.toUri().getPath();
// If there's no wildcard or fragment, just link to the file name
if (linkName == null) {
linkName = p.getName();
}
LocalResource orig = localResources.get(linkName);
if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
throw new InvalidJobConfException(

View File

@ -95,10 +95,13 @@ public class Job extends JobContextImpl implements JobContext {
static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
public static final String USED_GENERIC_PARSER =
"mapreduce.client.genericoptionsparser.used";
"mapreduce.client.genericoptionsparser.used";
public static final String SUBMIT_REPLICATION =
"mapreduce.client.submit.file.replication";
"mapreduce.client.submit.file.replication";
public static final int DEFAULT_SUBMIT_REPLICATION = 10;
public static final String USE_WILDCARD_FOR_LIBJARS =
"mapreduce.client.libjars.wildcard";
public static final boolean DEFAULT_USE_WILDCARD_FOR_LIBJARS = true;
@InterfaceStability.Evolving
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }

View File

@ -19,10 +19,8 @@ package org.apache.hadoop.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,10 +38,12 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@InterfaceStability.Unstable
class JobResourceUploader {
protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class);
private FileSystem jtFs;
private final boolean useWildcard;
private final FileSystem jtFs;
JobResourceUploader(FileSystem submitFs) {
JobResourceUploader(FileSystem submitFs, boolean useWildcard) {
this.jtFs = submitFs;
this.useWildcard = useWildcard;
}
/**
@ -126,8 +126,18 @@ class JobResourceUploader {
for (String tmpjars : libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
// Add each file to the classpath
DistributedCache.addFileToClassPath(
new Path(newPath.toUri().getPath()), conf, jtFs);
new Path(newPath.toUri().getPath()), conf, jtFs, !useWildcard);
}
if (useWildcard) {
// Add the whole directory to the cache
Path libJarsDirWildcard =
jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD));
DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
}
}

View File

@ -41,10 +41,10 @@ public class JobSubmissionFiles {
// job submission directory is private!
final public static FsPermission JOB_DIR_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx--------
FsPermission.createImmutable((short) 0700); // rwx------
//job files are world-wide readable and owner writable
final public static FsPermission JOB_FILE_PERMISSION =
FsPermission.createImmutable((short) 0644); // rw-r--r--
FsPermission.createImmutable((short) 0644); // rw-r--r--
public static Path getJobSplitFile(Path jobSubmissionDir) {
return new Path(jobSubmissionDir, "job.split");

View File

@ -94,7 +94,11 @@ class JobSubmitter {
*/
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
JobResourceUploader rUploader = new JobResourceUploader(jtFs);
Configuration conf = job.getConfiguration();
boolean useWildcards = conf.getBoolean(Job.USE_WILDCARD_FOR_LIBJARS,
Job.DEFAULT_USE_WILDCARD_FOR_LIBJARS);
JobResourceUploader rUploader = new JobResourceUploader(jtFs, useWildcards);
rUploader.uploadFiles(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir

View File

@ -227,21 +227,27 @@ public class ClientDistributedCacheManager {
/**
* Returns a boolean to denote whether a cache file is visible to all(public)
* or not
* @param conf
* @param uri
* @param conf the configuration
* @param uri the URI to test
* @return true if the path in the uri is visible to all, false otherwise
* @throws IOException
* @throws IOException thrown if a file system operation fails
*/
static boolean isPublic(Configuration conf, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
boolean isPublic = true;
FileSystem fs = FileSystem.get(uri, conf);
Path current = new Path(uri.getPath());
current = fs.makeQualified(current);
//the leaf level file should be readable by others
if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
return false;
// If we're looking at a wildcarded path, we only need to check that the
// ancestors allow execution. Otherwise, look for read permissions in
// addition to the ancestors' permissions.
if (!current.getName().equals(DistributedCache.WILDCARD)) {
isPublic = checkPermissionOfOther(fs, current, FsAction.READ, statCache);
}
return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
return isPublic &&
ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
}
/**
@ -284,11 +290,20 @@ public class ClientDistributedCacheManager {
private static FileStatus getFileStatus(FileSystem fs, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
Path path = new Path(uri);
if (path.getName().equals(DistributedCache.WILDCARD)) {
path = path.getParent();
uri = path.toUri();
}
FileStatus stat = statCache.get(uri);
if (stat == null) {
stat = fs.getFileStatus(new Path(uri));
stat = fs.getFileStatus(path);
statCache.put(uri, stat);
}
return stat;
}
}

View File

@ -126,12 +126,14 @@ import java.net.URI;
* as well as methods intended for use by the MapReduce framework
* (e.g., {@link org.apache.hadoop.mapred.JobClient}).
*
* @see org.apache.hadoop.mapreduce.Job
* @see org.apache.hadoop.mapred.JobConf
* @see org.apache.hadoop.mapred.JobClient
*/
@Deprecated
@InterfaceAudience.Private
public class DistributedCache {
public static final String WILDCARD = "*";
/**
* Set the configuration with the given set of archives. Intended
@ -139,6 +141,7 @@ public class DistributedCache {
* @param archives The list of archives that need to be localized
* @param conf Configuration which will be changed
* @deprecated Use {@link Job#setCacheArchives(URI[])} instead
* @see Job#setCacheArchives(URI[])
*/
@Deprecated
public static void setCacheArchives(URI[] archives, Configuration conf) {
@ -152,6 +155,7 @@ public class DistributedCache {
* @param files The list of files that need to be localized
* @param conf Configuration which will be changed
* @deprecated Use {@link Job#setCacheFiles(URI[])} instead
* @see Job#setCacheFiles(URI[])
*/
@Deprecated
public static void setCacheFiles(URI[] files, Configuration conf) {
@ -166,6 +170,7 @@ public class DistributedCache {
* @return A URI array of the caches set in the Configuration
* @throws IOException
* @deprecated Use {@link JobContext#getCacheArchives()} instead
* @see JobContext#getCacheArchives()
*/
@Deprecated
public static URI[] getCacheArchives(Configuration conf) throws IOException {
@ -179,6 +184,7 @@ public class DistributedCache {
* @return A URI array of the files set in the Configuration
* @throws IOException
* @deprecated Use {@link JobContext#getCacheFiles()} instead
* @see JobContext#getCacheFiles()
*/
@Deprecated
public static URI[] getCacheFiles(Configuration conf) throws IOException {
@ -192,6 +198,7 @@ public class DistributedCache {
* @return A path array of localized caches
* @throws IOException
* @deprecated Use {@link JobContext#getLocalCacheArchives()} instead
* @see JobContext#getLocalCacheArchives()
*/
@Deprecated
public static Path[] getLocalCacheArchives(Configuration conf)
@ -207,6 +214,7 @@ public class DistributedCache {
* @return A path array of localized files
* @throws IOException
* @deprecated Use {@link JobContext#getLocalCacheFiles()} instead
* @see JobContext#getLocalCacheFiles()
*/
@Deprecated
public static Path[] getLocalCacheFiles(Configuration conf)
@ -236,6 +244,7 @@ public class DistributedCache {
* @param conf The configuration which stored the timestamps
* @return a long array of timestamps
* @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
* @see JobContext#getArchiveTimestamps()
*/
@Deprecated
public static long[] getArchiveTimestamps(Configuration conf) {
@ -250,6 +259,7 @@ public class DistributedCache {
* @param conf The configuration which stored the timestamps
* @return a long array of timestamps
* @deprecated Use {@link JobContext#getFileTimestamps()} instead
* @see JobContext#getFileTimestamps()
*/
@Deprecated
public static long[] getFileTimestamps(Configuration conf) {
@ -263,6 +273,7 @@ public class DistributedCache {
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
* @deprecated Use {@link Job#addCacheArchive(URI)} instead
* @see Job#addCacheArchive(URI)
*/
@Deprecated
public static void addCacheArchive(URI uri, Configuration conf) {
@ -272,11 +283,27 @@ public class DistributedCache {
}
/**
* Add a file to be localized to the conf. Intended
* to be used by user code.
* Add a file to be localized to the conf. The localized file will be
* downloaded to the execution node(s), and a link will created to the
* file from the job's working directory. If the last part of URI's path name
* is "*", then the entire parent directory will be localized and links
* will be created from the job's working directory to each file in the
* parent directory.
*
* The access permissions of the file will determine whether the localized
* file will be shared across jobs. If the file is not readable by other or
* if any of its parent directories is not executable by other, then the
* file will not be shared. In the case of a path that ends in "/*",
* sharing of the localized files will be determined solely from the
* access permissions of the parent directories. The access permissions of
* the individual files will be ignored.
*
* Intended to be used by user code.
*
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
* @deprecated Use {@link Job#addCacheFile(URI)} instead
* @see Job#addCacheFile(URI)
*/
@Deprecated
public static void addCacheFile(URI uri, Configuration conf) {
@ -286,12 +313,14 @@ public class DistributedCache {
}
/**
* Add an file path to the current set of classpath entries It adds the file
* to cache as well. Intended to be used by user code.
* Add a file path to the current set of classpath entries. The file will
* also be added to the cache. Intended to be used by user code.
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
* @deprecated Use {@link Job#addFileToClassPath(Path)} instead
* @see #addCacheFile(URI, Configuration)
* @see Job#addFileToClassPath(Path)
*/
@Deprecated
public static void addFileToClassPath(Path file, Configuration conf)
@ -300,22 +329,42 @@ public class DistributedCache {
}
/**
* Add a file path to the current set of classpath entries. It adds the file
* to cache as well. Intended to be used by user code.
* Add a file path to the current set of classpath entries. The file will
* also be added to the cache. Intended to be used by user code.
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
* @param fs FileSystem with respect to which {@code archivefile} should
* be interpreted.
* @see #addCacheFile(URI, Configuration)
*/
public static void addFileToClassPath
(Path file, Configuration conf, FileSystem fs)
throws IOException {
public static void addFileToClassPath(Path file, Configuration conf,
FileSystem fs) {
addFileToClassPath(file, conf, fs, true);
}
/**
* Add a file path to the current set of classpath entries. The file will
* also be added to the cache if {@code addToCache} is true. Used by
* internal DistributedCache code.
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
* @param fs FileSystem with respect to which {@code archivefile} should
* be interpreted.
* @param addToCache whether the file should also be added to the cache list
* @see #addCacheFile(URI, Configuration)
*/
public static void addFileToClassPath(Path file, Configuration conf,
FileSystem fs, boolean addToCache) {
String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
: classpath + "," + file.toString());
URI uri = fs.makeQualified(file).toUri();
addCacheFile(uri, conf);
if (addToCache) {
URI uri = fs.makeQualified(file).toUri();
addCacheFile(uri, conf);
}
}
/**
@ -323,7 +372,8 @@ public class DistributedCache {
* Used by internal DistributedCache code.
*
* @param conf Configuration that contains the classpath setting
* @deprecated Use {@link JobContext#getFileClassPaths()} instead
* @deprecated Use {@link JobContext#getFileClassPaths()} instead
* @see JobContext#getFileClassPaths()
*/
@Deprecated
public static Path[] getFileClassPaths(Configuration conf) {
@ -346,6 +396,7 @@ public class DistributedCache {
* @param archive Path of the archive to be added
* @param conf Configuration that contains the classpath setting
* @deprecated Use {@link Job#addArchiveToClassPath(Path)} instead
* @see Job#addArchiveToClassPath(Path)
*/
@Deprecated
public static void addArchiveToClassPath(Path archive, Configuration conf)
@ -378,6 +429,7 @@ public class DistributedCache {
*
* @param conf Configuration that contains the classpath setting
* @deprecated Use {@link JobContext#getArchiveClassPaths()} instead
* @see JobContext#getArchiveClassPaths()
*/
@Deprecated
public static Path[] getArchiveClassPaths(Configuration conf) {

View File

@ -842,6 +842,24 @@
</description>
</property>
<property>
<name>mapreduce.client.libjars.wildcard</name>
<value>true</value>
<description>
Whether the libjars cache files should be localized using
a wildcarded directory instead of naming each archive independently.
Using wildcards reduces the space needed for storing the job
information in the case of a highly available resource manager
configuration.
This propery should only be set to false for specific
jobs which are highly sensitive to the details of the archive
localization. Having this property set to true will cause the archives
to all be localized to the same local cache location. If false, each
archive will be localized to its own local cache location. In both
cases a symbolic link will be created to every archive from the job's
working directory.
</description>
</property>
<property>
<name>mapreduce.task.profile</name>

View File

@ -17,11 +17,12 @@
*/
package org.apache.hadoop.mapreduce.filecache;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,6 +38,8 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.After;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
@ -55,22 +58,22 @@ public class TestClientDistributedCacheManager {
private static final Path TEST_VISIBILITY_CHILD_DIR =
new Path(TEST_VISIBILITY_PARENT_DIR, "TestCacheVisibility_Child");
private static final String FIRST_CACHE_FILE = "firstcachefile";
private static final String SECOND_CACHE_FILE = "secondcachefile";
private FileSystem fs;
private Path firstCacheFile;
private Path secondCacheFile;
private Path thirdCacheFile;
private Configuration conf;
@Before
public void setup() throws IOException {
conf = new Configuration();
fs = FileSystem.get(conf);
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
thirdCacheFile = new Path(TEST_VISIBILITY_CHILD_DIR,"thirdCachefile");
firstCacheFile = new Path(TEST_VISIBILITY_PARENT_DIR, FIRST_CACHE_FILE);
secondCacheFile = new Path(TEST_VISIBILITY_CHILD_DIR, SECOND_CACHE_FILE);
createTempFile(firstCacheFile, conf);
createTempFile(secondCacheFile, conf);
createTempFile(thirdCacheFile, conf);
}
@After
@ -88,37 +91,147 @@ public class TestClientDistributedCacheManager {
job.addCacheFile(secondCacheFile.toUri());
Configuration jobConf = job.getConfiguration();
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
Map<URI, FileStatus> statCache = new HashMap<>();
ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);
FileStatus firstStatus = statCache.get(firstCacheFile.toUri());
FileStatus secondStatus = statCache.get(secondCacheFile.toUri());
Assert.assertNotNull(firstStatus);
Assert.assertNotNull(secondStatus);
Assert.assertEquals(2, statCache.size());
Assert.assertNotNull(firstCacheFile + " was not found in the stats cache",
firstStatus);
Assert.assertNotNull(secondCacheFile + " was not found in the stats cache",
secondStatus);
Assert.assertEquals("Missing/extra entries found in the stas cache",
2, statCache.size());
String expected = firstStatus.getModificationTime() + ","
+ secondStatus.getModificationTime();
Assert.assertEquals(expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS));
job = Job.getInstance(conf);
job.addCacheFile(new Path(TEST_VISIBILITY_CHILD_DIR, "*").toUri());
jobConf = job.getConfiguration();
statCache.clear();
ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);
FileStatus thirdStatus = statCache.get(TEST_VISIBILITY_CHILD_DIR.toUri());
Assert.assertEquals("Missing/extra entries found in the stas cache",
1, statCache.size());
Assert.assertNotNull(TEST_VISIBILITY_CHILD_DIR
+ " was not found in the stats cache", thirdStatus);
expected = Long.toString(thirdStatus.getModificationTime());
Assert.assertEquals("Incorrect timestamp for " + TEST_VISIBILITY_CHILD_DIR,
expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS));
}
@Test
public void testDetermineCacheVisibilities() throws IOException {
fs.setWorkingDirectory(TEST_VISIBILITY_CHILD_DIR);
fs.setPermission(TEST_VISIBILITY_PARENT_DIR,
new FsPermission((short)00777));
fs.setPermission(TEST_VISIBILITY_CHILD_DIR,
new FsPermission((short)00777));
fs.setWorkingDirectory(TEST_VISIBILITY_CHILD_DIR);
Job job = Job.getInstance(conf);
Path relativePath = new Path(SECOND_CACHE_FILE);
Path wildcardPath = new Path("*");
Map<URI, FileStatus> statCache = new HashMap<>();
Configuration jobConf;
job.addCacheFile(firstCacheFile.toUri());
job.addCacheFile(relativePath.toUri());
jobConf = job.getConfiguration();
ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
statCache);
// We use get() instead of getBoolean() so we can tell the difference
// between wrong and missing
assertEquals("The file paths were not found to be publicly visible "
+ "even though the full path is publicly accessible",
"true,true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
checkCacheEntries(statCache, null, firstCacheFile, relativePath);
job = Job.getInstance(conf);
job.addCacheFile(wildcardPath.toUri());
jobConf = job.getConfiguration();
statCache.clear();
ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
statCache);
// We use get() instead of getBoolean() so we can tell the difference
// between wrong and missing
assertEquals("The file path was not found to be publicly visible "
+ "even though the full path is publicly accessible",
"true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
checkCacheEntries(statCache, null, wildcardPath.getParent());
Path qualifiedParent = fs.makeQualified(TEST_VISIBILITY_PARENT_DIR);
fs.setPermission(TEST_VISIBILITY_PARENT_DIR,
new FsPermission((short)00700));
Job job = Job.getInstance(conf);
Path relativePath = new Path("thirdCachefile");
job = Job.getInstance(conf);
job.addCacheFile(firstCacheFile.toUri());
job.addCacheFile(relativePath.toUri());
Configuration jobConf = job.getConfiguration();
jobConf = job.getConfiguration();
statCache.clear();
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
ClientDistributedCacheManager.
determineCacheVisibilities(jobConf, statCache);
Assert.assertFalse(jobConf.
getBoolean(MRJobConfig.CACHE_FILE_VISIBILITIES,true));
ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
statCache);
// We use get() instead of getBoolean() so we can tell the difference
// between wrong and missing
assertEquals("The file paths were found to be publicly visible "
+ "even though the parent directory is not publicly accessible",
"false,false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
checkCacheEntries(statCache, qualifiedParent,
firstCacheFile, relativePath);
job = Job.getInstance(conf);
job.addCacheFile(wildcardPath.toUri());
jobConf = job.getConfiguration();
statCache.clear();
ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
statCache);
// We use get() instead of getBoolean() so we can tell the difference
// between wrong and missing
assertEquals("The file path was found to be publicly visible "
+ "even though the parent directory is not publicly accessible",
"false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES));
checkCacheEntries(statCache, qualifiedParent, wildcardPath.getParent());
}
/**
* Validate that the file status cache contains all and only entries for a
* given set of paths up to a common parent.
*
* @param statCache the cache
* @param top the common parent at which to stop digging
* @param paths the paths to compare against the cache
*/
private void checkCacheEntries(Map<URI, FileStatus> statCache, Path top,
Path... paths) {
Set<URI> expected = new HashSet<>();
for (Path path : paths) {
Path p = fs.makeQualified(path);
while (!p.isRoot() && !p.equals(top)) {
expected.add(p.toUri());
p = p.getParent();
}
expected.add(p.toUri());
}
Set<URI> uris = statCache.keySet();
Set<URI> missing = new HashSet<>(uris);
Set<URI> extra = new HashSet<>(expected);
missing.removeAll(expected);
extra.removeAll(uris);
assertTrue("File status cache does not contain an entries for " + missing,
missing.isEmpty());
assertTrue("File status cache contains extra extries: " + extra,
extra.isEmpty());
}
@SuppressWarnings("deprecation")

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.filecache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Test the {@link DistributedCache} class.
*/
public class TestDistributedCache {
/**
* Test of addFileOnlyToClassPath method, of class DistributedCache.
*/
@Test
public void testAddFileToClassPath() throws Exception {
Configuration conf = new Configuration(false);
// Test first with 2 args
try {
DistributedCache.addFileToClassPath(null, conf);
fail("Accepted null archives argument");
} catch (NullPointerException ex) {
// Expected
}
DistributedCache.addFileToClassPath(new Path("file:///a"), conf);
assertEquals("The mapreduce.job.classpath.files property was not "
+ "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES));
assertEquals("The mapreduce.job.cache.files property was not set "
+ "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES));
DistributedCache.addFileToClassPath(new Path("file:///b"), conf);
assertEquals("The mapreduce.job.classpath.files property was not "
+ "set correctly", "file:/a,file:/b",
conf.get(MRJobConfig.CLASSPATH_FILES));
assertEquals("The mapreduce.job.cache.files property was not set "
+ "correctly", "file:///a,file:///b",
conf.get(MRJobConfig.CACHE_FILES));
// Now test with 3 args
FileSystem fs = FileSystem.newInstance(conf);
conf.clear();
try {
DistributedCache.addFileToClassPath(null, conf, fs);
fail("Accepted null archives argument");
} catch (NullPointerException ex) {
// Expected
}
DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs);
assertEquals("The mapreduce.job.classpath.files property was not "
+ "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES));
assertEquals("The mapreduce.job.cache.files property was not set "
+ "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES));
DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs);
assertEquals("The mapreduce.job.classpath.files property was not "
+ "set correctly", "file:/a,file:/b",
conf.get(MRJobConfig.CLASSPATH_FILES));
assertEquals("The mapreduce.job.cache.files property was not set "
+ "correctly", "file:///a,file:///b",
conf.get(MRJobConfig.CACHE_FILES));
// Now test with 4th arg true
conf.clear();
try {
DistributedCache.addFileToClassPath(null, conf, fs, true);
fail("Accepted null archives argument");
} catch (NullPointerException ex) {
// Expected
}
DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs, true);
assertEquals("The mapreduce.job.classpath.files property was not "
+ "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES));
assertEquals("The mapreduce.job.cache.files property was not set "
+ "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES));
DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs, true);
assertEquals("The mapreduce.job.classpath.files property was not "
+ "set correctly", "file:/a,file:/b",
conf.get(MRJobConfig.CLASSPATH_FILES));
assertEquals("The mapreduce.job.cache.files property was not set "
+ "correctly", "file:///a,file:///b",
conf.get(MRJobConfig.CACHE_FILES));
// And finally with 4th arg false
conf.clear();
try {
DistributedCache.addFileToClassPath(null, conf, fs, false);
fail("Accepted null archives argument");
} catch (NullPointerException ex) {
// Expected
}
DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs, false);
assertEquals("The mapreduce.job.classpath.files property was not "
+ "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES));
assertEquals("The mapreduce.job.cache.files property was not set "
+ "correctly", "", conf.get(MRJobConfig.CACHE_FILES, ""));
DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs, false);
assertEquals("The mapreduce.job.classpath.files property was not "
+ "set correctly", "file:/a,file:/b",
conf.get(MRJobConfig.CLASSPATH_FILES));
assertEquals("The mapreduce.job.cache.files property was not set "
+ "correctly", "", conf.get(MRJobConfig.CACHE_FILES, ""));
}
}

View File

@ -18,23 +18,20 @@
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
@ -47,24 +44,31 @@ public class TestLocalJobSubmission {
private static Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"));
@Before
public void configure() throws Exception {
}
@After
public void cleanup() {
}
/**
* test the local job submission options of
* -jt local -libjars.
* @throws IOException
* Test the local job submission options of -jt local -libjars.
*
* @throws IOException thrown if there's an error creating the JAR file
*/
@Test
public void testLocalJobLibjarsOption() throws IOException {
Configuration conf = new Configuration();
testLocalJobLibjarsOption(conf);
conf.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false);
testLocalJobLibjarsOption(conf);
}
/**
* Test the local job submission options of -jt local -libjars.
*
* @param conf the {@link Configuration} to use
* @throws IOException thrown if there's an error creating the JAR file
*/
private void testLocalJobLibjarsOption(Configuration conf)
throws IOException {
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
conf.set(MRConfig.FRAMEWORK_NAME, "local");
final String[] args = {

View File

@ -911,7 +911,8 @@ public class TestMRJobs {
}
}
public void _testDistributedCache(String jobJarPath) throws Exception {
private void testDistributedCache(String jobJarPath, boolean withWildcard)
throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
@ -920,7 +921,7 @@ public class TestMRJobs {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
// Create three jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
@ -929,16 +930,28 @@ public class TestMRJobs {
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(mrCluster.getConfig());
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(jobJarPath);
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
if (withWildcard) {
// If testing with wildcards, upload the DistributedCacheChecker into HDFS
// and add the directory as a wildcard.
Path libs = new Path("testLibs");
Path wildcard = remoteFs.makeQualified(new Path(libs, "*"));
remoteFs.mkdirs(libs);
remoteFs.copyFromLocalFile(third, libs);
job.addCacheFile(wildcard.toUri());
} else {
// Otherwise add the DistributedCacheChecker directly to the classpath.
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(localFs.makeQualified(distributedCacheCheckerJar));
}
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
@ -964,11 +977,10 @@ public class TestMRJobs {
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
@Test (timeout = 600000)
public void testDistributedCache() throws Exception {
private void testDistributedCache(boolean withWildcard) throws Exception {
// Test with a local (file:///) Job Jar
Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
_testDistributedCache(localJobJarPath.toUri().toString());
testDistributedCache(localJobJarPath.toUri().toString(), withWildcard);
// Test with a remote (hdfs://) Job Jar
Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/",
@ -978,7 +990,17 @@ public class TestMRJobs {
if (localJobJarFile.exists()) { // just to make sure
localJobJarFile.delete();
}
_testDistributedCache(remoteJobJarPath.toUri().toString());
testDistributedCache(remoteJobJarPath.toUri().toString(), withWildcard);
}
@Test (timeout = 300000)
public void testDistributedCache() throws Exception {
testDistributedCache(false);
}
@Test (timeout = 300000)
public void testDistributedCacheWithWildcards() throws Exception {
testDistributedCache(true);
}
@Test(timeout = 120000)