Merging r1539245 through r1539736 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1539737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-11-07 17:58:22 +00:00
commit 2af29af098
10 changed files with 247 additions and 49 deletions

View File

@ -431,6 +431,9 @@ Release 2.2.1 - UNRELEASED
HADOOP-10079. log a warning message if group resolution takes too long.
(cmccabe)
HADOOP-9623 Update jets3t dependency to 0.9.0. (Amandeep Khurana via Colin
Patrick McCabe)
OPTIMIZATIONS
BUG FIXES
@ -460,6 +463,10 @@ Release 2.2.1 - UNRELEASED
HADOOP-9478. Fix race conditions during the initialization of Configuration
related to deprecatedKeyMap (cmccabe)
HADOOP-9660. [WINDOWS] Powershell / cmd parses -Dkey=value from command line
as [-Dkey, value] which breaks GenericsOptionParser.
(Enis Soztutar via cnauroth)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.fs.s3.INode.FileType;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
@ -60,8 +61,8 @@ class Jets3tFileSystemStore implements FileSystemStore {
private static final String FILE_SYSTEM_VERSION_NAME = "fs-version";
private static final String FILE_SYSTEM_VERSION_VALUE = "1";
private static final Map<String, String> METADATA =
new HashMap<String, String>();
private static final Map<String, Object> METADATA =
new HashMap<String, Object>();
static {
METADATA.put(FILE_SYSTEM_NAME, FILE_SYSTEM_VALUE);
@ -165,7 +166,7 @@ private InputStream get(String key, boolean checkMetadata)
throws IOException {
try {
S3Object object = s3Service.getObject(bucket, key);
S3Object object = s3Service.getObject(bucket.getName(), key);
if (checkMetadata) {
checkMetadata(object);
}
@ -178,6 +179,9 @@ private InputStream get(String key, boolean checkMetadata)
throw (IOException) e.getCause();
}
throw new S3Exception(e);
} catch (ServiceException e) {
handleServiceException(e);
return null;
}
}
@ -194,6 +198,9 @@ private InputStream get(String key, long byteRangeStart) throws IOException {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
} catch (ServiceException e) {
handleServiceException(e);
return null;
}
}
@ -276,7 +283,7 @@ public Set<Path> listSubPaths(Path path) throws IOException {
if (!prefix.endsWith(PATH_DELIMITER)) {
prefix += PATH_DELIMITER;
}
S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER);
S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, PATH_DELIMITER);
Set<Path> prefixes = new TreeSet<Path>();
for (int i = 0; i < objects.length; i++) {
prefixes.add(keyToPath(objects[i].getKey()));
@ -298,7 +305,7 @@ public Set<Path> listDeepSubPaths(Path path) throws IOException {
if (!prefix.endsWith(PATH_DELIMITER)) {
prefix += PATH_DELIMITER;
}
S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
Set<Path> prefixes = new TreeSet<Path>();
for (int i = 0; i < objects.length; i++) {
prefixes.add(keyToPath(objects[i].getKey()));
@ -385,7 +392,7 @@ private boolean isRoot(String key) {
@Override
public void purge() throws IOException {
try {
S3Object[] objects = s3Service.listObjects(bucket);
S3Object[] objects = s3Service.listObjects(bucket.getName());
for (int i = 0; i < objects.length; i++) {
s3Service.deleteObject(bucket, objects[i].getKey());
}
@ -402,7 +409,7 @@ public void dump() throws IOException {
StringBuilder sb = new StringBuilder("S3 Filesystem, ");
sb.append(bucket.getName()).append("\n");
try {
S3Object[] objects = s3Service.listObjects(bucket, PATH_DELIMITER, null);
S3Object[] objects = s3Service.listObjects(bucket.getName(), PATH_DELIMITER, null);
for (int i = 0; i < objects.length; i++) {
Path path = keyToPath(objects[i].getKey());
sb.append(path).append("\n");
@ -424,4 +431,15 @@ public void dump() throws IOException {
System.out.println(sb);
}
private void handleServiceException(ServiceException e) throws IOException {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got ServiceException with Error code: " + e.getErrorCode() + ";and Error message: " + e.getErrorMessage());
}
}
}
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.util.ToolRunner;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
@ -177,7 +178,7 @@ private void migrate(Store oldStore, FileSystemStore newStore)
private S3Object get(String key) {
try {
return s3Service.getObject(bucket, key);
return s3Service.getObject(bucket.getName(), key);
} catch (S3ServiceException e) {
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
return null;
@ -200,7 +201,7 @@ class UnversionedStore implements Store {
public Set<Path> listAllPaths() throws IOException {
try {
String prefix = urlEncode(Path.SEPARATOR);
S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
Set<Path> prefixes = new TreeSet<Path>();
for (int i = 0; i < objects.length; i++) {
prefixes.add(keyToPath(objects[i].getKey()));
@ -237,7 +238,7 @@ public INode retrieveINode(Path path) throws IOException {
private InputStream get(String key) throws IOException {
try {
S3Object object = s3Service.getObject(bucket, key);
S3Object object = s3Service.getObject(bucket.getName(), key);
return object.getDataInputStream();
} catch (S3ServiceException e) {
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
@ -247,6 +248,8 @@ private InputStream get(String key) throws IOException {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
} catch (ServiceException e) {
return null;
}
}

View File

@ -29,17 +29,21 @@
import java.io.InputStream;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3.S3Credentials;
import org.apache.hadoop.fs.s3.S3Exception;
import org.jets3t.service.S3ObjectsChunk;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import org.jets3t.service.security.AWSCredentials;
@InterfaceAudience.Private
@ -48,6 +52,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
private S3Service s3Service;
private S3Bucket bucket;
public static final Log LOG =
LogFactory.getLog(Jets3tNativeFileSystemStore.class);
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
@ -59,7 +65,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
s3Credentials.getSecretAccessKey());
this.s3Service = new RestS3Service(awsCredentials);
} catch (S3ServiceException e) {
handleServiceException(e);
handleS3ServiceException(e);
}
bucket = new S3Bucket(uri.getHost());
}
@ -80,7 +86,7 @@ public void storeFile(String key, File file, byte[] md5Hash)
}
s3Service.putObject(bucket, object);
} catch (S3ServiceException e) {
handleServiceException(e);
handleS3ServiceException(e);
} finally {
if (in != null) {
try {
@ -101,47 +107,79 @@ public void storeEmptyFile(String key) throws IOException {
object.setContentLength(0);
s3Service.putObject(bucket, object);
} catch (S3ServiceException e) {
handleServiceException(e);
handleS3ServiceException(e);
}
}
@Override
public FileMetadata retrieveMetadata(String key) throws IOException {
try {
S3Object object = s3Service.getObjectDetails(bucket, key);
if(LOG.isDebugEnabled()) {
LOG.debug("Getting metadata for key: " + key + " from bucket:" + bucket.getName());
}
S3Object object = s3Service.getObject(bucket.getName(), key);
return new FileMetadata(key, object.getContentLength(),
object.getLastModifiedDate().getTime());
} catch (S3ServiceException e) {
// Following is brittle. Is there a better way?
if (e.getMessage().contains("ResponseCode=404")) {
return null;
if (e.getS3ErrorCode().matches("NoSuchKey")) {
return null; //return null if key not found
}
handleServiceException(e);
handleS3ServiceException(e);
return null; //never returned - keep compiler happy
}
}
/**
* @param key
* The key is the object name that is being retrieved from the S3 bucket
* @return
* This method returns null if the key is not found
* @throws IOException
*/
@Override
public InputStream retrieve(String key) throws IOException {
try {
S3Object object = s3Service.getObject(bucket, key);
if(LOG.isDebugEnabled()) {
LOG.debug("Getting key: " + key + " from bucket:" + bucket.getName());
}
S3Object object = s3Service.getObject(bucket.getName(), key);
return object.getDataInputStream();
} catch (S3ServiceException e) {
handleServiceException(key, e);
handleS3ServiceException(key, e);
return null; //never returned - keep compiler happy
} catch (ServiceException e) {
handleServiceException(e);
return null; //return null if key not found
}
}
/**
*
* @param key
* The key is the object name that is being retrieved from the S3 bucket
* @return
* This method returns null if the key is not found
* @throws IOException
*/
@Override
public InputStream retrieve(String key, long byteRangeStart)
throws IOException {
try {
if(LOG.isDebugEnabled()) {
LOG.debug("Getting key: " + key + " from bucket:" + bucket.getName() + " with byteRangeStart: " + byteRangeStart);
}
S3Object object = s3Service.getObject(bucket, key, null, null, null,
null, byteRangeStart, null);
return object.getDataInputStream();
} catch (S3ServiceException e) {
handleServiceException(key, e);
handleS3ServiceException(key, e);
return null; //never returned - keep compiler happy
} catch (ServiceException e) {
handleServiceException(e);
return null; //return null if key not found
}
}
@ -158,6 +196,13 @@ public PartialListing list(String prefix, int maxListingLength, String priorLast
return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength, priorLastKey);
}
/**
*
* @return
* This method returns null if the list could not be populated
* due to S3 giving ServiceException
* @throws IOException
*/
private PartialListing list(String prefix, String delimiter,
int maxListingLength, String priorLastKey) throws IOException {
@ -165,52 +210,63 @@ private PartialListing list(String prefix, String delimiter,
if (prefix.length() > 0 && !prefix.endsWith(PATH_DELIMITER)) {
prefix += PATH_DELIMITER;
}
S3ObjectsChunk chunk = s3Service.listObjectsChunked(bucket.getName(),
StorageObjectsChunk chunk = s3Service.listObjectsChunked(bucket.getName(),
prefix, delimiter, maxListingLength, priorLastKey);
FileMetadata[] fileMetadata =
new FileMetadata[chunk.getObjects().length];
for (int i = 0; i < fileMetadata.length; i++) {
S3Object object = chunk.getObjects()[i];
StorageObject object = chunk.getObjects()[i];
fileMetadata[i] = new FileMetadata(object.getKey(),
object.getContentLength(), object.getLastModifiedDate().getTime());
}
return new PartialListing(chunk.getPriorLastKey(), fileMetadata,
chunk.getCommonPrefixes());
} catch (S3ServiceException e) {
handleServiceException(e);
handleS3ServiceException(e);
return null; //never returned - keep compiler happy
} catch (ServiceException e) {
handleServiceException(e);
return null; //return null if list could not be populated
}
}
@Override
public void delete(String key) throws IOException {
try {
if(LOG.isDebugEnabled()) {
LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
}
s3Service.deleteObject(bucket, key);
} catch (S3ServiceException e) {
handleServiceException(key, e);
handleS3ServiceException(key, e);
}
}
@Override
public void copy(String srcKey, String dstKey) throws IOException {
try {
if(LOG.isDebugEnabled()) {
LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
}
s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
new S3Object(dstKey), false);
} catch (S3ServiceException e) {
handleServiceException(srcKey, e);
handleS3ServiceException(srcKey, e);
} catch (ServiceException e) {
handleServiceException(e);
}
}
@Override
public void purge(String prefix) throws IOException {
try {
S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
for (S3Object object : objects) {
s3Service.deleteObject(bucket, object.getKey());
}
} catch (S3ServiceException e) {
handleServiceException(e);
handleS3ServiceException(e);
}
}
@ -219,30 +275,44 @@ public void dump() throws IOException {
StringBuilder sb = new StringBuilder("S3 Native Filesystem, ");
sb.append(bucket.getName()).append("\n");
try {
S3Object[] objects = s3Service.listObjects(bucket);
S3Object[] objects = s3Service.listObjects(bucket.getName());
for (S3Object object : objects) {
sb.append(object.getKey()).append("\n");
}
} catch (S3ServiceException e) {
handleServiceException(e);
handleS3ServiceException(e);
}
System.out.println(sb);
}
private void handleServiceException(String key, S3ServiceException e) throws IOException {
private void handleS3ServiceException(String key, S3ServiceException e) throws IOException {
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
} else {
handleServiceException(e);
handleS3ServiceException(e);
}
}
private void handleServiceException(S3ServiceException e) throws IOException {
private void handleS3ServiceException(S3ServiceException e) throws IOException {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
else {
if(LOG.isDebugEnabled()) {
LOG.debug("S3 Error code: " + e.getS3ErrorCode() + "; S3 Error message: " + e.getS3ErrorMessage());
}
throw new S3Exception(e);
}
}
private void handleServiceException(ServiceException e) throws IOException {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got ServiceException with Error code: " + e.getErrorCode() + ";and Error message: " + e.getErrorMessage());
}
}
}
}

View File

@ -273,7 +273,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir =
new Path("/user", System.getProperty("user.name")).makeQualified(this);
new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
}
private static NativeFileSystemStore createDefaultStore(Configuration conf) {
@ -511,11 +511,11 @@ else if (relativePath.endsWith(FOLDER_SUFFIX)) {
private FileStatus newFile(FileMetadata meta, Path path) {
return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
meta.getLastModified(), path.makeQualified(this));
meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
}
private FileStatus newDirectory(Path path) {
return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this));
return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
}
@Override

View File

@ -243,6 +243,9 @@ Release 2.2.1 - UNRELEASED
MAPREDUCE-5604. TestMRAMWithNonNormalizedCapabilities fails on Windows due to
exceeding max path length. (cnauroth)
MAPREDUCE-5451. MR uses LD_LIBRARY_PATH which doesn't mean anything in
Windows. (Yingda Chen via cnauroth)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -575,7 +575,9 @@ public interface MRJobConfig {
public static final String MAPRED_ADMIN_USER_ENV =
"mapreduce.admin.user.env";
public static final String DEFAULT_MAPRED_ADMIN_USER_ENV =
public final String DEFAULT_MAPRED_ADMIN_USER_ENV =
Shell.WINDOWS ?
"PATH=%PATH%;%HADOOP_COMMON_HOME%\\bin":
"LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native";
public static final String WORKDIR = "work";

View File

@ -183,11 +183,16 @@
<property>
<name>mapreduce.admin.user.env</name>
<value>LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native</value>
<description>Expert: Additional execution environment entries for
<value></value>
<description>
Expert: Additional execution environment entries for
map and reduce task processes. This is not an additive property.
You must preserve the original value if you want your map and
reduce tasks to have access to native libraries (compression, etc).
When this value is empty, the command to set execution
envrionment will be OS dependent:
For linux, use LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native.
For windows, use PATH = %PATH%;%HADOOP_COMMON_HOME%\\bin.
</description>
</property>

View File

@ -23,7 +23,9 @@
import static org.junit.Assert.fail;
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -46,6 +48,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.util.Apps;
/**
* Class to test mapred task's
@ -108,6 +111,29 @@ public void configure(JobConf job) {
}
}
/**
* Map class which checks if hadoop lib location
* is in the execution path
*/
public static class ExecutionEnvCheckMapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
}
public void configure(JobConf job) {
String executionEnvPathVariable = System.getenv(Shell.WINDOWS ? "PATH"
: "LD_LIBRARY_PATH");
String hadoopHome = System.getenv("HADOOP_COMMON_HOME");
if (hadoopHome == null) {
hadoopHome = "";
}
String hadoopLibLocation = hadoopHome
+ (Shell.WINDOWS ? "\\bin" : "/lib/native");
assertTrue(executionEnvPathVariable.contains(hadoopLibLocation));
}
}
// configure a job
private void configure(JobConf conf, Path inDir, Path outDir, String input,
Class<? extends Mapper> map,
@ -153,8 +179,6 @@ public void launchTest(JobConf conf,
Path outDir,
String input)
throws IOException, InterruptedException, ClassNotFoundException {
configure(conf, inDir, outDir, input,
MapClass.class, IdentityReducer.class);
FileSystem outFs = outDir.getFileSystem(conf);
@ -359,7 +383,8 @@ public void testTaskTempDir(){
Path inDir = new Path("testing/wc/input");
Path outDir = new Path("testing/wc/output");
String input = "The input";
configure(conf, inDir, outDir, input,
MapClass.class, IdentityReducer.class);
launchTest(conf, inDir, outDir, input);
} catch(Exception e) {
@ -369,6 +394,66 @@ public void testTaskTempDir(){
}
}
/**
* To test OS dependent setting of default execution path for a MapRed task.
* Mainly that we can use MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV to set -
* for WINDOWS: %HADOOP_COMMON_HOME%\bin is expected to be included in PATH - for
* Linux: $HADOOP_COMMON_HOME/lib/native is expected to be included in
* LD_LIBRARY_PATH
*/
@Test
public void testMapRedExecutionEnv() {
// test if the env variable can be set
try {
// Application environment
Map<String, String> environment = new HashMap<String, String>();
String setupHadoopHomeCommand = Shell.WINDOWS ?
"HADOOP_COMMON_HOME=C:\\fake\\PATH\\to\\hadoop\\common\\home" :
"HADOOP_COMMON_HOME=/fake/path/to/hadoop/common/home";
Apps.setEnvFromInputString(environment, setupHadoopHomeCommand);
// Add the env variables passed by the admin
Apps.setEnvFromInputString(environment, conf.get(
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
String executionPaths = environment.get(
Shell.WINDOWS ? "PATH" : "LD_LIBRARY_PATH");
String toFind = Shell.WINDOWS ?
"C:\\fake\\PATH\\to\\hadoop\\common\\home\\bin" :
"/fake/path/to/hadoop/common/home/lib/native";
// Ensure execution PATH/LD_LIBRARY_PATH set up pointing to hadoop lib
assertTrue("execution path does not include the hadoop lib location "
+ toFind, executionPaths.contains(toFind));
} catch (Exception e) {
e.printStackTrace();
fail("Exception in testing execution environment for MapReduce task");
tearDown();
}
// now launch a mapreduce job to ensure that the child
// also gets the configured setting for hadoop lib
try {
JobConf conf = new JobConf(mr.getConfig());
// initialize input, output directories
Path inDir = new Path("input");
Path outDir = new Path("output");
String input = "The input";
// set config to use the ExecutionEnvCheckMapClass map class
configure(conf, inDir, outDir, input,
ExecutionEnvCheckMapClass.class, IdentityReducer.class);
launchTest(conf, inDir, outDir, input);
} catch(Exception e) {
e.printStackTrace();
fail("Exception in testing propagation of env setting to child task");
tearDown();
}
}
/**
* Test to test if the user set env variables reflect in the child
* processes. Mainly

View File

@ -347,6 +347,11 @@
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.5</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
@ -554,7 +559,7 @@
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.6.1</version>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>