Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1233105 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-01-18 23:02:17 +00:00
commit 6177bf6c8f
342 changed files with 852 additions and 129 deletions

View File

@ -276,6 +276,9 @@ Release 0.23.1 - Unreleased
HADOOP-7974. TestViewFsTrash incorrectly determines the user's home
directory. (harsh via eli)
HADOOP-7971. Adding back job/pipes/queue commands to bin/hadoop for
backward compatibility. (Prashath Sharma via acmurthy)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -61,8 +61,24 @@ case $COMMAND in
elif [ -f "${HADOOP_PREFIX}"/bin/hdfs ]; then
exec "${HADOOP_PREFIX}"/bin/hdfs $*
else
echo "HDFS not found."
exit
echo "HADOOP_HDFS_HOME not found!"
exit 1
fi
;;
#mapred commands for backwards compatibility
pipes|job|queue)
echo "DEPRECATED: Use of this script to execute mapred command is deprecated." 1>&2
echo "Instead use the mapred command for it." 1>&2
echo "" 1>&2
#try to locate mapred and if present, delegate to it.
if [ -f "${HADOOP_MAPRED_HOME}"/bin/mapred ]; then
exec "${HADOOP_MAPRED_HOME}"/bin/mapred $*
elif [ -f "${HADOOP_PREFIX}"/bin/mapred ]; then
exec "${HADOOP_PREFIX}"/bin/mapred $*
else
echo "HADOOP_MAPRED_HOME not found!"
exit 1
fi
;;

View File

@ -61,10 +61,6 @@ Trunk (unreleased changes)
HDFS-2337. DFSClient shouldn't keep multiple RPC proxy references (atm)
HDFS-362. FSEditLog should not writes long and short as UTF8, and should
not use ArrayWritable for writing non-array items. (Uma Maheswara Rao G
via szetszwo)
HDFS-2351 Change Namenode and Datanode to register each of their protocols
seperately. (Sanjay Radia)
@ -266,6 +262,10 @@ Release 0.23.1 - UNRELEASED
HDFS-2788. HdfsServerConstants#DN_KEEPALIVE_TIMEOUT is dead code (eli)
HDFS-362. FSEditLog should not writes long and short as UTF8, and should
not use ArrayWritable for writing non-array items. (Uma Maheswara Rao G
via szetszwo)
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)

View File

@ -183,6 +183,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3553. Add support for data returned when exceptions thrown from web
service apis to be in either xml or in JSON. (Thomas Graves via mahadev)
MAPREDUCE-3641. Making CapacityScheduler more conservative so as to
assign only one off-switch container in a single scheduling
iteration. (Arun C Murthy via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -499,6 +503,18 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe
via mahadev)
MAPREDUCE-2450. Fixed a corner case with interrupted communication threads
leading to a long timeout in Task. (Rajesh Balamohan via acmurthy)
MAPREDUCE-3669. Allow clients to talk to MR HistoryServer using both
delegation tokens and kerberos. (mahadev via acmurthy)
MAPREDUCE-3684. LocalDistributedCacheManager does not shut down its thread
pool (tomwhite)
MAPREDUCE-3582. Move successfully passing MR1 tests to MR2 maven tree.
(ahmed via tucu)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -39,6 +39,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -57,6 +58,8 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A helper class for managing the distributed cache for {@link LocalJobRunner}.
*/
@ -111,43 +114,52 @@ public void setup(JobConf conf) throws IOException {
FileContext localFSFileContext = FileContext.getLocalFSFileContext();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
ExecutorService exec = Executors.newCachedThreadPool();
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
for (LocalResource resource : localResources.values()) {
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
destPath, resource, new Random());
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}
for (LocalResource resource : localResources.values()) {
Path path;
try {
path = resourcesToPaths.get(resource).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e);
ExecutorService exec = null;
try {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalDistributedCacheManager Downloader #%d")
.build();
exec = Executors.newCachedThreadPool(tf);
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
for (LocalResource resource : localResources.values()) {
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
destPath, resource, new Random());
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}
String pathString = path.toUri().toString();
if (resource.getType() == LocalResourceType.ARCHIVE) {
localArchives.add(pathString);
} else if (resource.getType() == LocalResourceType.FILE) {
localFiles.add(pathString);
for (LocalResource resource : localResources.values()) {
Path path;
try {
path = resourcesToPaths.get(resource).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e);
}
String pathString = path.toUri().toString();
if (resource.getType() == LocalResourceType.ARCHIVE) {
localArchives.add(pathString);
} else if (resource.getType() == LocalResourceType.FILE) {
localFiles.add(pathString);
}
Path resourcePath;
try {
resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
} catch (URISyntaxException e) {
throw new IOException(e);
}
LOG.info(String.format("Localized %s as %s", resourcePath, path));
String cp = resourcePath.toUri().getPath();
if (classpaths.keySet().contains(cp)) {
localClasspaths.add(path.toUri().getPath().toString());
}
}
Path resourcePath;
try {
resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
} catch (URISyntaxException e) {
throw new IOException(e);
} finally {
if (exec != null) {
exec.shutdown();
}
LOG.info(String.format("Localized %s as %s", resourcePath, path));
String cp = resourcePath.toUri().getPath();
if (classpaths.keySet().contains(cp)) {
localClasspaths.add(path.toUri().getPath().toString());
}
}
}
// Update the configuration object with localized data.
if (!localArchives.isEmpty()) {
conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
@ -171,7 +183,7 @@ public void setup(JobConf conf) throws IOException {
}
setupCalled = true;
}
/**
* Are the resources that should be added to the classpath?
* Should be called after setup().

View File

@ -28,6 +28,7 @@
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -61,6 +62,8 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private
@InterfaceStability.Unstable
@ -302,7 +305,10 @@ protected ExecutorService createMapExecutor(int numMapTasks) {
LOG.debug("Map tasks to process: " + this.numMapTasks);
// Create a new executor service to drain the work queue.
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalJobRunner Map Task Executor #%d")
.build();
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
return executor;
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api;
public interface HSClientProtocol extends MRClientProtocol {
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.impl.pb.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
public class HSClientProtocolPBClientImpl extends MRClientProtocolPBClientImpl
implements HSClientProtocol {
public HSClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
super(clientVersion, addr, conf);
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.impl.pb.service;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.yarn.proto.HSClientProtocol.HSClientProtocolService.BlockingInterface;
public class HSClientProtocolPBServiceImpl extends MRClientProtocolPBServiceImpl
implements BlockingInterface {
public HSClientProtocolPBServiceImpl(HSClientProtocol impl) {
super(impl);
}
}

View File

@ -27,14 +27,14 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.MRClientProtocol;
import org.apache.hadoop.yarn.proto.HSClientProtocol;
public class ClientHSSecurityInfo extends SecurityInfo {
@Override
public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
if (!protocol
.equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) {
.equals(HSClientProtocol.HSClientProtocolService.BlockingInterface.class)) {
return null;
}
return new KerberosInfo() {
@ -59,7 +59,7 @@ public String clientPrincipal() {
@Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol
.equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) {
.equals(HSClientProtocol.HSClientProtocolService.BlockingInterface.class)) {
return null;
}
return new TokenInfo() {

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.proto;
/**
* Fake protocol to differentiate the blocking interfaces in the
* security info class loaders.
*/
public interface HSClientProtocol {
public abstract class HSClientProtocolService {
public interface BlockingInterface extends
MRClientProtocol.MRClientProtocolService.BlockingInterface {
}
}
}

View File

@ -22,6 +22,7 @@ option java_generic_services = true;
import "mr_service_protos.proto";
/* If making changes to this, please edit HSClientProtocolService */
service MRClientProtocolService {
rpc getJobReport (GetJobReportRequestProto) returns (GetJobReportResponseProto);
rpc getTaskReport (GetTaskReportRequestProto) returns (GetTaskReportResponseProto);

View File

@ -552,6 +552,8 @@ protected class TaskReporter
private InputSplit split = null;
private Progress taskProgress;
private Thread pingThread = null;
private boolean done = true;
private Object lock = new Object();
/**
* flag that indicates whether progress update needs to be sent to parent.
@ -648,6 +650,9 @@ public void run() {
// get current flag value and reset it as well
boolean sendProgress = resetProgressFlag();
while (!taskDone.get()) {
synchronized (lock) {
done = false;
}
try {
boolean taskFound = true; // whether TT knows about this task
// sleep for a bit
@ -680,6 +685,7 @@ public void run() {
// came back up), kill ourselves
if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId);
resetDoneFlag();
System.exit(66);
}
@ -692,10 +698,19 @@ public void run() {
if (remainingRetries == 0) {
ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
LOG.warn("Last retry, killing "+taskId);
resetDoneFlag();
System.exit(65);
}
}
}
//Notify that we are done with the work
resetDoneFlag();
}
void resetDoneFlag() {
synchronized (lock) {
done = true;
lock.notify();
}
}
public void startCommunicationThread() {
if (pingThread == null) {
@ -706,6 +721,11 @@ public void startCommunicationThread() {
}
public void stopCommunicationThread() throws InterruptedException {
if (pingThread != null) {
synchronized (lock) {
while (!done) {
lock.wait();
}
}
pingThread.interrupt();
pingThread.join();
}

View File

@ -35,6 +35,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -49,6 +50,7 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
@ -86,6 +88,9 @@ public class JobHistory extends AbstractService implements HistoryContext {
private static final Log LOG = LogFactory.getLog(JobHistory.class);
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
public static final Pattern CONF_FILENAME_REGEX =
Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
public static final String OLD_SUFFIX = ".old";
private static String DONE_BEFORE_SERIAL_TAIL =
JobHistoryUtils.doneSubdirsBeforeSerialTail();

View File

@ -28,6 +28,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.net.NetUtils;
@ -91,7 +92,7 @@ protected MRClientProtocol instantiateHistoryProxy()
return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), conf);
}
});

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.*;
import org.junit.Ignore;
/**
* Distributed i/o benchmark.
@ -66,6 +67,7 @@
* <li>standard i/o rate deviation</li>
* </ul>
*/
@Ignore
public class DFSCIOTest extends TestCase {
// Constants
private static final Log LOG = LogFactory.getLog(DFSCIOTest.class);

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.*;
import org.junit.Ignore;
/**
* Distributed checkup of the file system consistency.
@ -52,6 +53,7 @@
* Optionally displays statistics on read performance.
*
*/
@Ignore
public class DistributedFSCheck extends TestCase {
// Constants
private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class);

View File

@ -35,10 +35,12 @@
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.net.StandardSocketFactory;
import org.junit.Ignore;
/**
* This class checks that RPCs can use specialized socket factories.
*/
@Ignore
public class TestSocketFactory extends TestCase {
/**

View File

@ -39,7 +39,8 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Ignore;
@Ignore
public class TestBadRecords extends ClusterMapReduceTestCase {
private static final Log LOG =

View File

@ -20,9 +20,12 @@
import java.io.IOException;
import org.junit.Ignore;
/**
* Tests Job end notification in cluster mode.
*/
@Ignore
public class TestClusterMRNotification extends NotificationTestCase {
public TestClusterMRNotification() throws IOException {

View File

@ -21,10 +21,11 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.junit.Ignore;
import java.io.*;
import java.util.Properties;
@Ignore
public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase {
public void _testMapReduce(boolean restart) throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));

View File

@ -28,12 +28,13 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Ignore;
/**
* check for the job submission options of
* -libjars -files -archives
*/
@Ignore
public class TestCommandLineJobSubmission extends TestCase {
// Input output paths for this..
// these are all dummy and does not test

View File

@ -36,12 +36,13 @@
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Ignore;
import org.junit.Test;
import static junit.framework.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@Ignore
public class TestConcatenatedCompressedInput {
private static final Log LOG =
LogFactory.getLog(TestConcatenatedCompressedInput.class.getName());

Some files were not shown because too many files have changed in this diff Show More