MAPREDUCE-5300. Fix backward incompatibility for o.a.h.mapreduce.filecache.DistributedCache. Contributed by Zhijie Shen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1caf3e363f
commit
78331cfeca
|
@ -487,6 +487,9 @@ Release 2.1.0-beta - UNRELEASED
|
||||||
MAPREDUCE-5296. Fix backward incompatibility for JobControl by adding the
|
MAPREDUCE-5296. Fix backward incompatibility for JobControl by adding the
|
||||||
omitted addJob. (Zhijie Shen via acmurthy)
|
omitted addJob. (Zhijie Shen via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-5300. Fix backward incompatibility for
|
||||||
|
o.a.h.mapreduce.filecache.DistributedCache. (Zhijie Shen via acmurthy)
|
||||||
|
|
||||||
BREAKDOWN OF HADOOP-8562 SUBTASKS
|
BREAKDOWN OF HADOOP-8562 SUBTASKS
|
||||||
|
|
||||||
MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
|
MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
|
||||||
|
|
|
@ -336,17 +336,6 @@ public class MRApps extends Apps {
|
||||||
return startCommitFile;
|
return startCommitFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long[] parseTimeStamps(String[] strs) {
|
|
||||||
if (null == strs) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
long[] result = new long[strs.length];
|
|
||||||
for(int i=0; i < strs.length; ++i) {
|
|
||||||
result[i] = Long.parseLong(strs[i]);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void setupDistributedCache(
|
public static void setupDistributedCache(
|
||||||
Configuration conf,
|
Configuration conf,
|
||||||
Map<String, LocalResource> localResources)
|
Map<String, LocalResource> localResources)
|
||||||
|
@ -356,7 +345,7 @@ public class MRApps extends Apps {
|
||||||
parseDistributedCacheArtifacts(conf, localResources,
|
parseDistributedCacheArtifacts(conf, localResources,
|
||||||
LocalResourceType.ARCHIVE,
|
LocalResourceType.ARCHIVE,
|
||||||
DistributedCache.getCacheArchives(conf),
|
DistributedCache.getCacheArchives(conf),
|
||||||
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
|
DistributedCache.getArchiveTimestamps(conf),
|
||||||
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
|
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
|
||||||
DistributedCache.getArchiveVisibilities(conf));
|
DistributedCache.getArchiveVisibilities(conf));
|
||||||
|
|
||||||
|
@ -365,7 +354,7 @@ public class MRApps extends Apps {
|
||||||
localResources,
|
localResources,
|
||||||
LocalResourceType.FILE,
|
LocalResourceType.FILE,
|
||||||
DistributedCache.getCacheFiles(conf),
|
DistributedCache.getCacheFiles(conf),
|
||||||
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
|
DistributedCache.getFileTimestamps(conf),
|
||||||
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
|
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
|
||||||
DistributedCache.getFileVisibilities(conf));
|
DistributedCache.getFileVisibilities(conf));
|
||||||
}
|
}
|
||||||
|
|
|
@ -245,14 +245,14 @@ public class TestMRWithDistributedCache extends TestCase {
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
DistributedCache.getArchiveTimestamps(conf).length);
|
DistributedCache.getArchiveTimestamps(conf).length);
|
||||||
Assert.assertEquals(1234567890,
|
Assert.assertEquals(1234567890,
|
||||||
Long.parseLong(DistributedCache.getArchiveTimestamps(conf)[0]));
|
DistributedCache.getArchiveTimestamps(conf)[0]);
|
||||||
DistributedCache.setFileTimestamps(conf, "1234567890");
|
DistributedCache.setFileTimestamps(conf, "1234567890");
|
||||||
Assert.assertEquals(1234567890,
|
Assert.assertEquals(1234567890,
|
||||||
conf.getLong(DistributedCache.CACHE_FILES_TIMESTAMPS, 0));
|
conf.getLong(DistributedCache.CACHE_FILES_TIMESTAMPS, 0));
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
DistributedCache.getFileTimestamps(conf).length);
|
DistributedCache.getFileTimestamps(conf).length);
|
||||||
Assert.assertEquals(1234567890,
|
Assert.assertEquals(1234567890,
|
||||||
Long.parseLong(DistributedCache.getFileTimestamps(conf)[0]));
|
DistributedCache.getFileTimestamps(conf)[0]);
|
||||||
|
|
||||||
DistributedCache.createAllSymlink(conf, new File("Test Job Cache Dir"),
|
DistributedCache.createAllSymlink(conf, new File("Test Job Cache Dir"),
|
||||||
new File("Test Work Dir"));
|
new File("Test Work Dir"));
|
||||||
|
|
|
@ -214,17 +214,34 @@ public class DistributedCache {
|
||||||
return StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALFILES));
|
return StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALFILES));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse a list of strings into longs.
|
||||||
|
* @param strs the list of strings to parse
|
||||||
|
* @return a list of longs that were parsed. same length as strs.
|
||||||
|
*/
|
||||||
|
private static long[] parseTimestamps(String[] strs) {
|
||||||
|
if (strs == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
long[] result = new long[strs.length];
|
||||||
|
for(int i=0; i < strs.length; ++i) {
|
||||||
|
result[i] = Long.parseLong(strs[i]);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the timestamps of the archives. Used by internal
|
* Get the timestamps of the archives. Used by internal
|
||||||
* DistributedCache and MapReduce code.
|
* DistributedCache and MapReduce code.
|
||||||
* @param conf The configuration which stored the timestamps
|
* @param conf The configuration which stored the timestamps
|
||||||
* @return a string array of timestamps
|
* @return a long array of timestamps
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
|
* @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static String[] getArchiveTimestamps(Configuration conf) {
|
public static long[] getArchiveTimestamps(Configuration conf) {
|
||||||
return conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
|
return parseTimestamps(
|
||||||
|
conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -232,13 +249,14 @@ public class DistributedCache {
|
||||||
* Get the timestamps of the files. Used by internal
|
* Get the timestamps of the files. Used by internal
|
||||||
* DistributedCache and MapReduce code.
|
* DistributedCache and MapReduce code.
|
||||||
* @param conf The configuration which stored the timestamps
|
* @param conf The configuration which stored the timestamps
|
||||||
* @return a string array of timestamps
|
* @return a long array of timestamps
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @deprecated Use {@link JobContext#getFileTimestamps()} instead
|
* @deprecated Use {@link JobContext#getFileTimestamps()} instead
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static String[] getFileTimestamps(Configuration conf) {
|
public static long[] getFileTimestamps(Configuration conf) {
|
||||||
return conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
|
return parseTimestamps(
|
||||||
|
conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -343,6 +343,22 @@ public class JobContextImpl implements JobContext {
|
||||||
return DistributedCache.getFileClassPaths(conf);
|
return DistributedCache.getFileClassPaths(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse a list of longs into strings.
|
||||||
|
* @param timestamps the list of longs to parse
|
||||||
|
* @return a list of string that were parsed. same length as timestamps.
|
||||||
|
*/
|
||||||
|
private static String[] toTimestampStrs(long[] timestamps) {
|
||||||
|
if (timestamps == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String[] result = new String[timestamps.length];
|
||||||
|
for(int i=0; i < timestamps.length; ++i) {
|
||||||
|
result[i] = Long.toString(timestamps[i]);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the timestamps of the archives. Used by internal
|
* Get the timestamps of the archives. Used by internal
|
||||||
* DistributedCache and MapReduce code.
|
* DistributedCache and MapReduce code.
|
||||||
|
@ -350,7 +366,7 @@ public class JobContextImpl implements JobContext {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public String[] getArchiveTimestamps() {
|
public String[] getArchiveTimestamps() {
|
||||||
return DistributedCache.getArchiveTimestamps(conf);
|
return toTimestampStrs(DistributedCache.getArchiveTimestamps(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -360,7 +376,7 @@ public class JobContextImpl implements JobContext {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public String[] getFileTimestamps() {
|
public String[] getFileTimestamps() {
|
||||||
return DistributedCache.getFileTimestamps(conf);
|
return toTimestampStrs(DistributedCache.getFileTimestamps(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue