MAPREDUCE-4549. Distributed cache conflicts breaks backwards compatability. (Robert Evans via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1420363 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
680b437f2f
commit
1fce56fd24
@ -958,6 +958,9 @@ Release 0.23.3
|
|||||||
MAPREDUCE-4641. Exception in commitJob marks job as successful in job
|
MAPREDUCE-4641. Exception in commitJob marks job as successful in job
|
||||||
history (Jason Lowe via bobby)
|
history (Jason Lowe via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-4549. Distributed cache conflicts breaks backwards compatability
|
||||||
|
(Robert Evans via tucu)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -25,6 +25,8 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -54,14 +56,14 @@
|
|||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class for MR applications
|
* Helper class for MR applications
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class MRApps extends Apps {
|
public class MRApps extends Apps {
|
||||||
|
private static final Log LOG = LogFactory.getLog(MRApps.class);
|
||||||
|
|
||||||
public static String toString(JobId jid) {
|
public static String toString(JobId jid) {
|
||||||
return jid.toString();
|
return jid.toString();
|
||||||
}
|
}
|
||||||
@ -296,6 +298,16 @@ private static String getResourceDescription(LocalResourceType type) {
|
|||||||
return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
|
return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String toString(org.apache.hadoop.yarn.api.records.URL url) {
|
||||||
|
StringBuffer b = new StringBuffer();
|
||||||
|
b.append(url.getScheme()).append("://").append(url.getHost());
|
||||||
|
if(url.getPort() >= 0) {
|
||||||
|
b.append(":").append(url.getPort());
|
||||||
|
}
|
||||||
|
b.append(url.getFile());
|
||||||
|
return b.toString();
|
||||||
|
}
|
||||||
|
|
||||||
// TODO - Move this to MR!
|
// TODO - Move this to MR!
|
||||||
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
||||||
// long[], boolean[], Path[], FileType)
|
// long[], boolean[], Path[], FileType)
|
||||||
@ -333,11 +345,15 @@ private static void parseDistributedCacheArtifacts(
|
|||||||
}
|
}
|
||||||
String linkName = name.toUri().getPath();
|
String linkName = name.toUri().getPath();
|
||||||
LocalResource orig = localResources.get(linkName);
|
LocalResource orig = localResources.get(linkName);
|
||||||
if(orig != null && !orig.getResource().equals(
|
org.apache.hadoop.yarn.api.records.URL url =
|
||||||
ConverterUtils.getYarnUrlFromURI(p.toUri()))) {
|
ConverterUtils.getYarnUrlFromURI(p.toUri());
|
||||||
throw new InvalidJobConfException(
|
if(orig != null && !orig.getResource().equals(url)) {
|
||||||
getResourceDescription(orig.getType()) + orig.getResource() +
|
LOG.warn(
|
||||||
" conflicts with " + getResourceDescription(type) + u);
|
getResourceDescription(orig.getType()) +
|
||||||
|
toString(orig.getResource()) + " conflicts with " +
|
||||||
|
getResourceDescription(type) + toString(url) +
|
||||||
|
" This will be an error in Hadoop 2.0");
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
localResources.put(
|
localResources.put(
|
||||||
linkName,
|
linkName,
|
||||||
|
@ -29,7 +29,6 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FilterFileSystem;
|
import org.apache.hadoop.fs.FilterFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
@ -38,7 +37,6 @@
|
|||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
@ -47,6 +45,7 @@
|
|||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
@ -245,7 +244,7 @@ public void testSetupDistributedCacheEmpty() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Test(expected = InvalidJobConfException.class)
|
@Test
|
||||||
public void testSetupDistributedCacheConflicts() throws Exception {
|
public void testSetupDistributedCacheConflicts() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||||
@ -273,10 +272,18 @@ public void testSetupDistributedCacheConflicts() throws Exception {
|
|||||||
Map<String, LocalResource> localResources =
|
Map<String, LocalResource> localResources =
|
||||||
new HashMap<String, LocalResource>();
|
new HashMap<String, LocalResource>();
|
||||||
MRApps.setupDistributedCache(conf, localResources);
|
MRApps.setupDistributedCache(conf, localResources);
|
||||||
|
|
||||||
|
assertEquals(1, localResources.size());
|
||||||
|
LocalResource lr = localResources.get("something");
|
||||||
|
//Archive wins
|
||||||
|
assertNotNull(lr);
|
||||||
|
assertEquals(10l, lr.getSize());
|
||||||
|
assertEquals(10l, lr.getTimestamp());
|
||||||
|
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Test(expected = InvalidJobConfException.class)
|
@Test
|
||||||
public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||||
@ -301,6 +308,14 @@ public void testSetupDistributedCacheConflictsFiles() throws Exception {
|
|||||||
Map<String, LocalResource> localResources =
|
Map<String, LocalResource> localResources =
|
||||||
new HashMap<String, LocalResource>();
|
new HashMap<String, LocalResource>();
|
||||||
MRApps.setupDistributedCache(conf, localResources);
|
MRApps.setupDistributedCache(conf, localResources);
|
||||||
|
|
||||||
|
assertEquals(1, localResources.size());
|
||||||
|
LocalResource lr = localResources.get("something");
|
||||||
|
//First one wins
|
||||||
|
assertNotNull(lr);
|
||||||
|
assertEquals(10l, lr.getSize());
|
||||||
|
assertEquals(10l, lr.getTimestamp());
|
||||||
|
assertEquals(LocalResourceType.FILE, lr.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user