MAPREDUCE-5263. Bring back old methods and fields in filecache.DistributedCache for binary compatibility with mapred in 1.x. Contributed by Zhijie Shen.

svn merge --ignore-ancestry -c 1487695 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1487696 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-30 01:24:15 +00:00
parent 2d8508120d
commit 6cb1cf3d1a
4 changed files with 305 additions and 2 deletions

View File

@ -104,6 +104,10 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5270. Migrated MR app from using BuilderUtil factory methods to MAPREDUCE-5270. Migrated MR app from using BuilderUtil factory methods to
individual record factory methods. (Jian He via vinodkv) individual record factory methods. (Jian He via vinodkv)
MAPREDUCE-5263. Bring back old methods and fields in
filecache.DistributedCache for binary compatibility with mapred in 1.x.
(Zhijie Shen via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -27,12 +27,15 @@
import java.util.jar.JarOutputStream; import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import junit.framework.Assert;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -44,6 +47,7 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.junit.Test;
/** /**
* Tests the use of the * Tests the use of the
@ -59,6 +63,7 @@
* *
* This test is not fast: it uses MiniMRCluster. * This test is not fast: it uses MiniMRCluster.
*/ */
@SuppressWarnings("deprecation")
public class TestMRWithDistributedCache extends TestCase { public class TestMRWithDistributedCache extends TestCase {
private static Path TEST_ROOT_DIR = private static Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp")); new Path(System.getProperty("test.build.data","/tmp"));
@ -187,4 +192,93 @@ private Path makeJar(Path p, int index) throws FileNotFoundException,
jos.close(); jos.close();
return p; return p;
} }
@Test (timeout = 1000)
public void testDeprecatedFunctions() throws Exception {
DistributedCache.addLocalArchives(conf, "Test Local Archives 1");
Assert.assertEquals("Test Local Archives 1",
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
Assert.assertEquals(1,
DistributedCache.getLocalCacheArchives(conf).length);
Assert.assertEquals("Test Local Archives 1",
DistributedCache.getLocalCacheArchives(conf)[0].getName());
DistributedCache.addLocalArchives(conf, "Test Local Archives 2");
Assert.assertEquals("Test Local Archives 1,Test Local Archives 2",
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
Assert.assertEquals(2,
DistributedCache.getLocalCacheArchives(conf).length);
Assert.assertEquals("Test Local Archives 2",
DistributedCache.getLocalCacheArchives(conf)[1].getName());
DistributedCache.setLocalArchives(conf, "Test Local Archives 3");
Assert.assertEquals("Test Local Archives 3",
conf.get(DistributedCache.CACHE_LOCALARCHIVES));
Assert.assertEquals(1,
DistributedCache.getLocalCacheArchives(conf).length);
Assert.assertEquals("Test Local Archives 3",
DistributedCache.getLocalCacheArchives(conf)[0].getName());
DistributedCache.addLocalFiles(conf, "Test Local Files 1");
Assert.assertEquals("Test Local Files 1",
conf.get(DistributedCache.CACHE_LOCALFILES));
Assert.assertEquals(1,
DistributedCache.getLocalCacheFiles(conf).length);
Assert.assertEquals("Test Local Files 1",
DistributedCache.getLocalCacheFiles(conf)[0].getName());
DistributedCache.addLocalFiles(conf, "Test Local Files 2");
Assert.assertEquals("Test Local Files 1,Test Local Files 2",
conf.get(DistributedCache.CACHE_LOCALFILES));
Assert.assertEquals(2,
DistributedCache.getLocalCacheFiles(conf).length);
Assert.assertEquals("Test Local Files 2",
DistributedCache.getLocalCacheFiles(conf)[1].getName());
DistributedCache.setLocalFiles(conf, "Test Local Files 3");
Assert.assertEquals("Test Local Files 3",
conf.get(DistributedCache.CACHE_LOCALFILES));
Assert.assertEquals(1,
DistributedCache.getLocalCacheFiles(conf).length);
Assert.assertEquals("Test Local Files 3",
DistributedCache.getLocalCacheFiles(conf)[0].getName());
DistributedCache.setArchiveTimestamps(conf, "1234567890");
Assert.assertEquals(1234567890,
conf.getLong(DistributedCache.CACHE_ARCHIVES_TIMESTAMPS, 0));
Assert.assertEquals(1,
DistributedCache.getArchiveTimestamps(conf).length);
Assert.assertEquals(1234567890,
Long.parseLong(DistributedCache.getArchiveTimestamps(conf)[0]));
DistributedCache.setFileTimestamps(conf, "1234567890");
Assert.assertEquals(1234567890,
conf.getLong(DistributedCache.CACHE_FILES_TIMESTAMPS, 0));
Assert.assertEquals(1,
DistributedCache.getFileTimestamps(conf).length);
Assert.assertEquals(1234567890,
Long.parseLong(DistributedCache.getFileTimestamps(conf)[0]));
DistributedCache.createAllSymlink(conf, new File("Test Job Cache Dir"),
new File("Test Work Dir"));
Assert.assertNull(conf.get(DistributedCache.CACHE_SYMLINK));
Assert.assertTrue(DistributedCache.getSymlink(conf));
Assert.assertTrue(symlinkFile.createNewFile());
FileStatus fileStatus =
DistributedCache.getFileStatus(conf, symlinkFile.toURI());
Assert.assertNotNull(fileStatus);
Assert.assertEquals(fileStatus.getModificationTime(),
DistributedCache.getTimestamp(conf, symlinkFile.toURI()));
Assert.assertTrue(symlinkFile.delete());
DistributedCache.addCacheArchive(symlinkFile.toURI(), conf);
Assert.assertEquals(symlinkFile.toURI().toString(),
conf.get(DistributedCache.CACHE_ARCHIVES));
Assert.assertEquals(1, DistributedCache.getCacheArchives(conf).length);
Assert.assertEquals(symlinkFile.toURI(),
DistributedCache.getCacheArchives(conf)[0]);
DistributedCache.addCacheFile(symlinkFile.toURI(), conf);
Assert.assertEquals(symlinkFile.toURI().toString(),
conf.get(DistributedCache.CACHE_FILES));
Assert.assertEquals(1, DistributedCache.getCacheFiles(conf).length);
Assert.assertEquals(symlinkFile.toURI(),
DistributedCache.getCacheFiles(conf)[0]);
}
} }

View File

@ -18,12 +18,17 @@
package org.apache.hadoop.filecache; package org.apache.hadoop.filecache;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig;
/** /**
* Distribute application-specific large, read-only files efficiently. * Distribute application-specific large, read-only files efficiently.
@ -123,9 +128,204 @@
* @see org.apache.hadoop.mapred.JobClient * @see org.apache.hadoop.mapred.JobClient
* @see org.apache.hadoop.mapreduce.Job * @see org.apache.hadoop.mapreduce.Job
*/ */
@SuppressWarnings("deprecation")
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
@Deprecated
public class DistributedCache extends public class DistributedCache extends
org.apache.hadoop.mapreduce.filecache.DistributedCache { org.apache.hadoop.mapreduce.filecache.DistributedCache {
// /**
* Warning: {@link #CACHE_FILES_SIZES} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_FILES_SIZES}
*/
@Deprecated
public static final String CACHE_FILES_SIZES =
"mapred.cache.files.filesizes";
/**
* Warning: {@link #CACHE_ARCHIVES_SIZES} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_ARCHIVES_SIZES}
*/
@Deprecated
public static final String CACHE_ARCHIVES_SIZES =
"mapred.cache.archives.filesizes";
/**
* Warning: {@link #CACHE_ARCHIVES_TIMESTAMPS} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_ARCHIVES_TIMESTAMPS}
*/
@Deprecated
public static final String CACHE_ARCHIVES_TIMESTAMPS =
"mapred.cache.archives.timestamps";
/**
* Warning: {@link #CACHE_FILES_TIMESTAMPS} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_FILE_TIMESTAMPS}
*/
@Deprecated
public static final String CACHE_FILES_TIMESTAMPS =
"mapred.cache.files.timestamps";
/**
* Warning: {@link #CACHE_ARCHIVES} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_ARCHIVES}
*/
@Deprecated
public static final String CACHE_ARCHIVES = "mapred.cache.archives";
/**
* Warning: {@link #CACHE_FILES} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_FILES}
*/
@Deprecated
public static final String CACHE_FILES = "mapred.cache.files";
/**
* Warning: {@link #CACHE_LOCALARCHIVES} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_LOCALARCHIVES}
*/
@Deprecated
public static final String CACHE_LOCALARCHIVES =
"mapred.cache.localArchives";
/**
* Warning: {@link #CACHE_LOCALFILES} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_LOCALFILES}
*/
@Deprecated
public static final String CACHE_LOCALFILES = "mapred.cache.localFiles";
/**
* Warning: {@link #CACHE_SYMLINK} is not a *public* constant.
* The variable is kept for M/R 1.x applications, M/R 2.x applications should
* use {@link MRJobConfig#CACHE_SYMLINK}
*/
@Deprecated
public static final String CACHE_SYMLINK = "mapred.create.symlink";
/**
* Add a archive that has been localized to the conf. Used
* by internal DistributedCache code.
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local archives
*/
@Deprecated
public static void addLocalArchives(Configuration conf, String str) {
String archives = conf.get(CACHE_LOCALARCHIVES);
conf.set(CACHE_LOCALARCHIVES, archives == null ? str
: archives + "," + str);
}
/**
* Add a file that has been localized to the conf.. Used
* by internal DistributedCache code.
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local files
*/
@Deprecated
public static void addLocalFiles(Configuration conf, String str) {
String files = conf.get(CACHE_LOCALFILES);
conf.set(CACHE_LOCALFILES, files == null ? str
: files + "," + str);
}
/**
* This method create symlinks for all files in a given dir in another
* directory. Currently symlinks cannot be disabled. This is a NO-OP.
*
* @param conf the configuration
* @param jobCacheDir the target directory for creating symlinks
* @param workDir the directory in which the symlinks are created
* @throws IOException
* @deprecated Internal to MapReduce framework. Use DistributedCacheManager
* instead.
*/
@Deprecated
public static void createAllSymlink(
Configuration conf, File jobCacheDir, File workDir)
throws IOException{
// Do nothing
}
/**
* Returns {@link FileStatus} of a given cache file on hdfs. Internal to
* MapReduce.
* @param conf configuration
* @param cache cache file
* @return <code>FileStatus</code> of a given cache file on hdfs
* @throws IOException
*/
@Deprecated
public static FileStatus getFileStatus(Configuration conf, URI cache)
throws IOException {
FileSystem fileSystem = FileSystem.get(cache, conf);
return fileSystem.getFileStatus(new Path(cache.getPath()));
}
/**
* Returns mtime of a given cache file on hdfs. Internal to MapReduce.
* @param conf configuration
* @param cache cache file
* @return mtime of a given cache file on hdfs
* @throws IOException
*/
@Deprecated
public static long getTimestamp(Configuration conf, URI cache)
throws IOException {
return getFileStatus(conf, cache).getModificationTime();
}
/**
* This is to check the timestamp of the archives to be localized.
* Used by internal MapReduce code.
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of archives.
* The order should be the same as the order in which the archives are added.
*/
@Deprecated
public static void setArchiveTimestamps(Configuration conf, String timestamps) {
conf.set(CACHE_ARCHIVES_TIMESTAMPS, timestamps);
}
/**
* This is to check the timestamp of the files to be localized.
* Used by internal MapReduce code.
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of files.
* The order should be the same as the order in which the files are added.
*/
@Deprecated
public static void setFileTimestamps(Configuration conf, String timestamps) {
conf.set(CACHE_FILES_TIMESTAMPS, timestamps);
}
/**
* Set the conf to contain the location for localized archives. Used
* by internal DistributedCache code.
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local archives
*/
@Deprecated
public static void setLocalArchives(Configuration conf, String str) {
conf.set(CACHE_LOCALARCHIVES, str);
}
/**
* Set the conf to contain the location for localized files. Used
* by internal DistributedCache code.
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local files
*/
@Deprecated
public static void setLocalFiles(Configuration conf, String str) {
conf.set(CACHE_LOCALFILES, str);
}
} }

View File

@ -47,6 +47,7 @@ public static void loadResources() {
/** /**
* Adds deprecated keys and the corresponding new keys to the Configuration * Adds deprecated keys and the corresponding new keys to the Configuration
*/ */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() { private static void addDeprecatedKeys() {
Configuration.addDeprecation("mapred.temp.dir", Configuration.addDeprecation("mapred.temp.dir",
new String[] {MRConfig.TEMP_DIR}); new String[] {MRConfig.TEMP_DIR});
@ -242,6 +243,10 @@ private static void addDeprecatedKeys() {
new String[] {MRJobConfig.CACHE_LOCALFILES}); new String[] {MRJobConfig.CACHE_LOCALFILES});
Configuration.addDeprecation("mapred.cache.localArchives", Configuration.addDeprecation("mapred.cache.localArchives",
new String[] {MRJobConfig.CACHE_LOCALARCHIVES}); new String[] {MRJobConfig.CACHE_LOCALARCHIVES});
Configuration.addDeprecation("mapred.cache.files.filesizes",
new String[] {MRJobConfig.CACHE_FILES_SIZES});
Configuration.addDeprecation("mapred.cache.archives.filesizes",
new String[] {MRJobConfig.CACHE_ARCHIVES_SIZES});
Configuration.addDeprecation("mapred.cache.files.timestamps", Configuration.addDeprecation("mapred.cache.files.timestamps",
new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS}); new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS});
Configuration.addDeprecation("mapred.cache.archives.timestamps", Configuration.addDeprecation("mapred.cache.archives.timestamps",