merge -r 1390556:1390557 from trunk. FIXES: MAPREDUCE-4647

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1390563 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-09-26 15:25:05 +00:00
parent ff5f50532a
commit e5e1887092
20 changed files with 203 additions and 41 deletions

View File

@ -422,6 +422,9 @@ Release 0.23.4 - UNRELEASED
BUG FIXES
MAPREDUCE-4647. We should only unjar jobjar if there is a lib directory
in it. (Robert Evans via tgraves)
Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MapReduceChildJVM;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapred.Task;
@ -610,10 +611,12 @@ public abstract class TaskAttemptImpl implements
if (jobJar != null) {
Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
.getUri(), remoteFS.getWorkingDirectory());
localResources.put(
MRJobConfig.JOB_JAR,
createLocalResource(remoteFS, remoteJobJar,
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION));
LocalResource rc = createLocalResource(remoteFS, remoteJobJar,
LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
rc.setPattern(pattern);
localResources.put(MRJobConfig.JOB_JAR, rc);
LOG.info("The job-jar file on the remote FS is "
+ remoteJobJar.toUri().toASCIIString());
} else {

View File

@ -152,6 +152,10 @@ class LocalDistributedCacheManager {
localArchives.add(pathString);
} else if (resource.getType() == LocalResourceType.FILE) {
localFiles.add(pathString);
} else if (resource.getType() == LocalResourceType.PATTERN) {
//PATTERN is not currently used in local mode
throw new IllegalArgumentException("Resource type PATTERN is not " +
"implemented yet. " + resource.getResource());
}
Path resourcePath;
try {

View File

@ -210,7 +210,7 @@ public class MRApps extends Apps {
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
MRJobConfig.JOB_JAR + Path.SEPARATOR);
MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR);
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
@ -281,7 +281,7 @@ public class MRApps extends Apps {
}
private static String getResourceDescription(LocalResourceType type) {
if(type == LocalResourceType.ARCHIVE) {
if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
}
return "cache file (" + MRJobConfig.CACHE_FILES + ") ";

View File

@ -166,7 +166,7 @@ public class TestMRApps {
}
String env_str = env.get("CLASSPATH");
assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
env_str.indexOf("$PWD:job.jar/:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0);
env_str.indexOf("$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0);
}
@Test public void testSetClasspathWithNoUserPrecendence() {
@ -180,7 +180,7 @@ public class TestMRApps {
}
String env_str = env.get("CLASSPATH");
int index =
env_str.indexOf("job.jar/:job.jar/classes/:job.jar/lib/*:$PWD/*");
env_str.indexOf("job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*");
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not"
+ " in the classpath!", index, -1);
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",

View File

@ -346,9 +346,13 @@ public class YARNRunner implements ClientProtocol {
jobConfPath, LocalResourceType.FILE));
if (jobConf.get(MRJobConfig.JAR) != null) {
Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
localResources.put(MRJobConfig.JOB_JAR,
createApplicationResource(defaultFileContext,
jobJarPath, LocalResourceType.ARCHIVE));
LocalResource rc = createApplicationResource(defaultFileContext,
jobJarPath,
LocalResourceType.PATTERN);
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
rc.setPattern(pattern);
localResources.put(MRJobConfig.JOB_JAR, rc);
} else {
// Job jar may be null. For e.g, for pipes, the job jar is the hadoop
// mapreduce jar itself which is already on the classpath.

View File

@ -106,4 +106,20 @@ public interface LocalResource {
* localized
*/
public void setVisibility(LocalResourceVisibility visibility);
/**
* Get the <em>pattern</em> that should be used to extract entries from the
* archive (only used when type is <code>PATTERN</code>).
* @return <em>pattern</em> that should be used to extract entries from the
* archive.
*/
public String getPattern();
/**
* Set the <em>pattern</em> that should be used to extract entries from the
* archive (only used when type is <code>PATTERN</code>).
* @param pattern <em>pattern</em> that should be used to extract entries
* from the archive.
*/
public void setPattern(String pattern);
}

View File

@ -55,5 +55,14 @@ public enum LocalResourceType {
/**
* Regular file i.e. uninterpreted bytes.
*/
FILE
FILE,
/**
* A hybrid between archive and file. Only part of the file is unarchived,
* and the original file is left in place, but in the same directory as the
* unarchived part. The part that is unarchived is determined by pattern
* in #{@link LocalResource}. Currently only jars support pattern, all
* others will be treated like a #{@link LocalResourceType#ARCHIVE}.
*/
PATTERN
}

View File

@ -152,6 +152,25 @@ public class LocalResourcePBImpl extends ProtoBase<LocalResourceProto>
builder.setVisibility(convertToProtoFormat(visibility));
}
@Override
public synchronized String getPattern() {
LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasPattern()) {
return null;
}
return p.getPattern();
}
@Override
public synchronized void setPattern(String pattern) {
maybeInitBuilder();
if (pattern == null) {
builder.clearPattern();
return;
}
builder.setPattern(pattern);
}
private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
return ProtoUtils.convertToProtoFormat(e);
}

View File

@ -135,6 +135,7 @@ enum LocalResourceVisibilityProto {
enum LocalResourceTypeProto {
ARCHIVE = 1;
FILE = 2;
PATTERN = 3;
}
message LocalResourceProto {
@ -143,6 +144,7 @@ message LocalResourceProto {
optional int64 timestamp = 3;
optional LocalResourceTypeProto type = 4;
optional LocalResourceVisibilityProto visibility = 5;
optional string pattern = 6;
}
message ApplicationResourceUsageReportProto {

View File

@ -25,6 +25,7 @@ import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -103,9 +104,9 @@ public class FSDownload implements Callable<Path> {
return dCopy;
}
private long unpack(File localrsrc, File dst) throws IOException {
private long unpack(File localrsrc, File dst, Pattern pattern) throws IOException {
switch (resource.getType()) {
case ARCHIVE:
case ARCHIVE: {
String lowerDst = dst.getName().toLowerCase();
if (lowerDst.endsWith(".jar")) {
RunJar.unJar(localrsrc, dst);
@ -122,6 +123,38 @@ public class FSDownload implements Callable<Path> {
+ "] to [" + dst + "]");
}
}
}
break;
case PATTERN: {
String lowerDst = dst.getName().toLowerCase();
if (lowerDst.endsWith(".jar")) {
RunJar.unJar(localrsrc, dst, pattern);
File newDst = new File(dst, dst.getName());
if (!dst.exists() && !dst.mkdir()) {
throw new IOException("Unable to create directory: [" + dst + "]");
}
if (!localrsrc.renameTo(newDst)) {
throw new IOException("Unable to rename file: [" + localrsrc
+ "] to [" + newDst + "]");
}
} else if (lowerDst.endsWith(".zip")) {
LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
"was specified as PATTERN");
FileUtil.unZip(localrsrc, dst);
} else if (lowerDst.endsWith(".tar.gz") ||
lowerDst.endsWith(".tgz") ||
lowerDst.endsWith(".tar")) {
LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
"was specified as PATTERN");
FileUtil.unTar(localrsrc, dst);
} else {
LOG.warn("Cannot unpack " + localrsrc);
if (!localrsrc.renameTo(dst)) {
throw new IOException("Unable to rename file: [" + localrsrc
+ "] to [" + dst + "]");
}
}
}
break;
case FILE:
default:
@ -164,7 +197,12 @@ public class FSDownload implements Callable<Path> {
return files.makeQualified(copy(sCopy, dst_work));
};
});
unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
Pattern pattern = null;
String p = resource.getPattern();
if(p != null) {
pattern = Pattern.compile(p);
}
unpack(new File(dTmp.toUri()), new File(dFinal.toUri()), pattern);
changePermissions(dFinal.getFileSystem(conf), dFinal);
files.rename(dst_work, destDirPath, Rename.OVERWRITE);
} catch (Exception e) {

View File

@ -107,8 +107,9 @@ public class TestFSDownload {
FileStatus status = files.getFileStatus(p);
ret.setSize(status.getLen());
ret.setTimestamp(status.getModificationTime());
ret.setType(LocalResourceType.ARCHIVE);
ret.setType(LocalResourceType.PATTERN);
ret.setVisibility(vis);
ret.setPattern("classes/.*");
return ret;
}

View File

@ -209,6 +209,7 @@ public class ContainerLocalizer {
}
switch (rsrc.getType()) {
case ARCHIVE:
case PATTERN:
return 5 * rsrc.getSize();
case FILE:
default:

View File

@ -34,6 +34,7 @@ public class LocalResourceRequest
private final long timestamp;
private final LocalResourceType type;
private final LocalResourceVisibility visibility;
private final String pattern;
/**
* Wrap API resource to match against cache of localized resources.
@ -45,22 +46,28 @@ public class LocalResourceRequest
this(ConverterUtils.getPathFromYarnURL(resource.getResource()),
resource.getTimestamp(),
resource.getType(),
resource.getVisibility());
resource.getVisibility(),
resource.getPattern());
}
LocalResourceRequest(Path loc, long timestamp, LocalResourceType type,
LocalResourceVisibility visibility) {
LocalResourceVisibility visibility, String pattern) {
this.loc = loc;
this.timestamp = timestamp;
this.type = type;
this.visibility = visibility;
this.pattern = pattern;
}
@Override
public int hashCode() {
return loc.hashCode() ^
int hash = loc.hashCode() ^
(int)((timestamp >>> 32) ^ timestamp) *
type.hashCode();
if(pattern != null) {
hash = hash ^ pattern.hashCode();
}
return hash;
}
@Override
@ -72,9 +79,14 @@ public class LocalResourceRequest
return false;
}
final LocalResourceRequest other = (LocalResourceRequest) o;
String pattern = getPattern();
String otherPattern = other.getPattern();
boolean patternEquals = (pattern == null && otherPattern == null) ||
(pattern != null && otherPattern != null && pattern.equals(otherPattern));
return getPath().equals(other.getPath()) &&
getTimestamp() == other.getTimestamp() &&
getType() == other.getType();
getType() == other.getType() &&
patternEquals;
}
@Override
@ -87,6 +99,19 @@ public class LocalResourceRequest
ret = (int)(getTimestamp() - other.getTimestamp());
if (0 == ret) {
ret = getType().ordinal() - other.getType().ordinal();
if (0 == ret) {
String pattern = getPattern();
String otherPattern = other.getPattern();
if (pattern == null && otherPattern == null) {
ret = 0;
} else if (pattern == null) {
ret = -1;
} else if (otherPattern == null) {
ret = 1;
} else {
ret = pattern.compareTo(otherPattern);
}
}
}
}
return ret;
@ -121,6 +146,11 @@ public class LocalResourceRequest
return visibility;
}
@Override
public String getPattern() {
return pattern;
}
@Override
public void setResource(URL resource) {
throw new UnsupportedOperationException();
@ -146,13 +176,19 @@ public class LocalResourceRequest
throw new UnsupportedOperationException();
}
@Override
public void setPattern(String pattern) {
throw new UnsupportedOperationException();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{ ");
sb.append(getPath().toString()).append(", ");
sb.append(getTimestamp()).append(", ");
sb.append(getType()).append(" }");
sb.append(getType()).append(", ");
sb.append(getPattern()).append(" }");
return sb.toString();
}
}

View File

@ -219,7 +219,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
ContainerId container = ctxt.getContainerId();
rsrc.ref.add(container);
rsrc.dispatcher.getEventHandler().handle(
new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt));
new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt,
req.getLocalResourceRequest().getPattern()));
}
}

View File

@ -752,6 +752,7 @@ public class ResourceLocalizationService extends CompositeService
next.setTimestamp(nextRsrc.getTimestamp());
next.setType(nextRsrc.getType());
next.setVisibility(evt.getVisibility());
next.setPattern(evt.getPattern());
scheduled.put(nextRsrc, evt);
return next;
}

View File

@ -32,14 +32,16 @@ public class LocalizerResourceRequestEvent extends LocalizerEvent {
private final LocalizerContext context;
private final LocalizedResource resource;
private final LocalResourceVisibility vis;
private final String pattern;
public LocalizerResourceRequestEvent(LocalizedResource resource,
LocalResourceVisibility vis, LocalizerContext context) {
LocalResourceVisibility vis, LocalizerContext context, String pattern) {
super(LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION,
ConverterUtils.toString(context.getContainerId()));
this.vis = vis;
this.context = context;
this.resource = resource;
this.pattern = pattern;
}
public LocalizedResource getResource() {
@ -54,4 +56,8 @@ public class LocalizerResourceRequestEvent extends LocalizerEvent {
return vis;
}
public String getPattern() {
return pattern;
}
}

View File

@ -37,7 +37,7 @@ import static org.junit.Assert.*;
public class TestLocalResource {
static org.apache.hadoop.yarn.api.records.LocalResource getYarnResource(Path p, long size,
long timestamp, LocalResourceType type, LocalResourceVisibility state)
long timestamp, LocalResourceType type, LocalResourceVisibility state, String pattern)
throws URISyntaxException {
org.apache.hadoop.yarn.api.records.LocalResource ret = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.yarn.api.records.LocalResource.class);
ret.setResource(ConverterUtils.getYarnUrlFromURI(p.toUri()));
@ -45,6 +45,7 @@ public class TestLocalResource {
ret.setTimestamp(timestamp);
ret.setType(type);
ret.setVisibility(state);
ret.setPattern(pattern);
return ret;
}
@ -72,9 +73,9 @@ public class TestLocalResource {
long basetime = r.nextLong() >>> 2;
org.apache.hadoop.yarn.api.records.LocalResource yA = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC, null);
org.apache.hadoop.yarn.api.records.LocalResource yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC, null);
final LocalResourceRequest a = new LocalResourceRequest(yA);
LocalResourceRequest b = new LocalResourceRequest(yA);
checkEqual(a, b);
@ -83,31 +84,37 @@ public class TestLocalResource {
// ignore visibility
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PRIVATE);
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PRIVATE, null);
b = new LocalResourceRequest(yB);
checkEqual(a, b);
// ignore size
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), 0, basetime, FILE, PRIVATE);
new Path("http://yak.org:80/foobar"), 0, basetime, FILE, PRIVATE, null);
b = new LocalResourceRequest(yB);
checkEqual(a, b);
// note path
yB = getYarnResource(
new Path("hdfs://dingo.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
new Path("hdfs://dingo.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC, null);
b = new LocalResourceRequest(yB);
checkNotEqual(a, b);
// note type
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
new Path("http://yak.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC, null);
b = new LocalResourceRequest(yB);
checkNotEqual(a, b);
// note timestamp
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), 0, basetime + 1, FILE, PUBLIC);
new Path("http://yak.org:80/foobar"), 0, basetime + 1, FILE, PUBLIC, null);
b = new LocalResourceRequest(yB);
checkNotEqual(a, b);
// note pattern
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), 0, basetime + 1, FILE, PUBLIC, "^/foo/.*");
b = new LocalResourceRequest(yB);
checkNotEqual(a, b);
}
@ -120,24 +127,35 @@ public class TestLocalResource {
System.out.println("SEED: " + seed);
long basetime = r.nextLong() >>> 2;
org.apache.hadoop.yarn.api.records.LocalResource yA = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC, "^/foo/.*");
final LocalResourceRequest a = new LocalResourceRequest(yA);
// Path primary
org.apache.hadoop.yarn.api.records.LocalResource yB = getYarnResource(
new Path("http://yak.org:80/foobaz"), -1, basetime, FILE, PUBLIC);
new Path("http://yak.org:80/foobaz"), -1, basetime, FILE, PUBLIC, "^/foo/.*");
LocalResourceRequest b = new LocalResourceRequest(yB);
assertTrue(0 > a.compareTo(b));
// timestamp secondary
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime + 1, FILE, PUBLIC);
new Path("http://yak.org:80/foobar"), -1, basetime + 1, FILE, PUBLIC, "^/foo/.*");
b = new LocalResourceRequest(yB);
assertTrue(0 > a.compareTo(b));
// type tertiary
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, ARCHIVE, PUBLIC);
new Path("http://yak.org:80/foobar"), -1, basetime, ARCHIVE, PUBLIC, "^/foo/.*");
b = new LocalResourceRequest(yB);
assertTrue(0 != a.compareTo(b)); // don't care about order, just ne
// path 4th
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, ARCHIVE, PUBLIC, "^/food/.*");
b = new LocalResourceRequest(yB);
assertTrue(0 != a.compareTo(b)); // don't care about order, just ne
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, ARCHIVE, PUBLIC, null);
b = new LocalResourceRequest(yB);
assertTrue(0 != a.compareTo(b)); // don't care about order, just ne
}

View File

@ -230,7 +230,7 @@ public class TestLocalResourcesTrackerImpl {
long ts, LocalResourceVisibility vis) {
final LocalResourceRequest req =
new LocalResourceRequest(new Path("file:///tmp/" + user + "/rsrc" + i),
ts + i * 2000, LocalResourceType.FILE, vis);
ts + i * 2000, LocalResourceType.FILE, vis, null);
return req;
}

View File

@ -83,7 +83,7 @@ public class TestResourceRetention {
for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC);
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
final long ts = timestamp + i * tsstep;
final Path p = new Path("file:///local/" + user + "/rsrc" + i);
LocalizedResource rsrc = new LocalizedResource(req, null) {