svn merge -c 1369197 FIXES: MAPREDUCE-4503. Should throw InvalidJobConfException if duplicates found in cacheArchives or cacheFiles (Robert Evans via jeagles)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1369201 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
99050a6508
commit
054dd2d39c
|
@ -671,6 +671,9 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-4504. SortValidator writes to wrong directory (Robert Evans
|
||||
via tgraves)
|
||||
|
||||
MAPREDUCE-4503. Should throw InvalidJobConfException if duplicates found in
|
||||
cacheArchives or cacheFiles (Robert Evans via jeagles)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
|
@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
/**
|
||||
* Helper class for MR applications
|
||||
|
@ -264,6 +266,13 @@ public class MRApps extends Apps {
|
|||
DistributedCache.getFileClassPaths(conf));
|
||||
}
|
||||
|
||||
private static String getResourceDescription(LocalResourceType type) {
|
||||
if(type == LocalResourceType.ARCHIVE) {
|
||||
return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
|
||||
}
|
||||
return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
|
||||
}
|
||||
|
||||
// TODO - Move this to MR!
|
||||
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
||||
// long[], boolean[], Path[], FileType)
|
||||
|
@ -309,6 +318,13 @@ public class MRApps extends Apps {
|
|||
throw new IllegalArgumentException("Resource name must be relative");
|
||||
}
|
||||
String linkName = name.toUri().getPath();
|
||||
LocalResource orig = localResources.get(linkName);
|
||||
if(orig != null && !orig.getResource().equals(
|
||||
ConverterUtils.getYarnUrlFromURI(p.toUri()))) {
|
||||
throw new InvalidJobConfException(
|
||||
getResourceDescription(orig.getType()) + orig.getResource() +
|
||||
" conflicts with " + getResourceDescription(type) + u);
|
||||
}
|
||||
localResources.put(
|
||||
linkName,
|
||||
BuilderUtils.newLocalResource(
|
||||
|
|
|
@ -19,25 +19,33 @@
|
|||
package org.apache.hadoop.mapreduce.v2.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TestMRApps {
|
||||
|
||||
|
@ -167,4 +175,121 @@ public class TestMRApps {
|
|||
env_str.indexOf("$PWD:job.jar"), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetupDistributedCacheEmpty() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
||||
MRApps.setupDistributedCache(conf, localResources);
|
||||
assertTrue("Empty Config did not produce an empty list of resources",
|
||||
localResources.isEmpty());
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test(expected = InvalidJobConfException.class)
|
||||
public void testSetupDistributedCacheConflicts() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||
|
||||
URI mockUri = URI.create("mockfs://mock/");
|
||||
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
||||
.getRawFileSystem();
|
||||
|
||||
URI archive = new URI("mockfs://mock/tmp/something.zip#something");
|
||||
Path archivePath = new Path(archive);
|
||||
URI file = new URI("mockfs://mock/tmp/something.txt#something");
|
||||
Path filePath = new Path(file);
|
||||
|
||||
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
||||
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
||||
|
||||
DistributedCache.addCacheArchive(archive, conf);
|
||||
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
||||
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
||||
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
||||
DistributedCache.addCacheFile(file, conf);
|
||||
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
MRApps.setupDistributedCache(conf, localResources);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test(expected = InvalidJobConfException.class)
|
||||
public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||
|
||||
URI mockUri = URI.create("mockfs://mock/");
|
||||
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
||||
.getRawFileSystem();
|
||||
|
||||
URI file = new URI("mockfs://mock/tmp/something.zip#something");
|
||||
Path filePath = new Path(file);
|
||||
URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
|
||||
Path file2Path = new Path(file);
|
||||
|
||||
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
||||
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
|
||||
|
||||
DistributedCache.addCacheFile(file, conf);
|
||||
DistributedCache.addCacheFile(file2, conf);
|
||||
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
|
||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
|
||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
MRApps.setupDistributedCache(conf, localResources);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testSetupDistributedCache() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||
|
||||
URI mockUri = URI.create("mockfs://mock/");
|
||||
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
|
||||
.getRawFileSystem();
|
||||
|
||||
URI archive = new URI("mockfs://mock/tmp/something.zip");
|
||||
Path archivePath = new Path(archive);
|
||||
URI file = new URI("mockfs://mock/tmp/something.txt#something");
|
||||
Path filePath = new Path(file);
|
||||
|
||||
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
|
||||
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
|
||||
|
||||
DistributedCache.addCacheArchive(archive, conf);
|
||||
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
|
||||
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
|
||||
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
|
||||
DistributedCache.addCacheFile(file, conf);
|
||||
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
|
||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
|
||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
MRApps.setupDistributedCache(conf, localResources);
|
||||
assertEquals(2, localResources.size());
|
||||
LocalResource lr = localResources.get("something.zip");
|
||||
assertNotNull(lr);
|
||||
assertEquals(10l, lr.getSize());
|
||||
assertEquals(10l, lr.getTimestamp());
|
||||
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
|
||||
lr = localResources.get("something");
|
||||
assertNotNull(lr);
|
||||
assertEquals(11l, lr.getSize());
|
||||
assertEquals(11l, lr.getTimestamp());
|
||||
assertEquals(LocalResourceType.FILE, lr.getType());
|
||||
}
|
||||
|
||||
static class MockFileSystem extends FilterFileSystem {
|
||||
MockFileSystem() {
|
||||
super(mock(FileSystem.class));
|
||||
}
|
||||
public void initialize(URI name, Configuration conf) throws IOException {}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue