Merge r1232271 through r1233740 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1233742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-20 03:14:44 +00:00
commit f2a3dccbd8
26 changed files with 268 additions and 56 deletions

View File

@ -195,6 +195,9 @@ Release 0.23.1 - Unreleased
HADOOP-7974. TestViewFsTrash incorrectly determines the user's home
directory. (harsh via eli)
HADOOP-7971. Adding back job/pipes/queue commands to bin/hadoop for
backward compatibility. (Prashath Sharma via acmurthy)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -60,8 +60,24 @@ case $COMMAND in
elif [ -f "${HADOOP_PREFIX}"/bin/hdfs ]; then
exec "${HADOOP_PREFIX}"/bin/hdfs $*
else
echo "HDFS not found."
exit
echo "HADOOP_HDFS_HOME not found!"
exit 1
fi
;;
#mapred commands for backwards compatibility
pipes|job|queue)
echo "DEPRECATED: Use of this script to execute mapred command is deprecated." 1>&2
echo "Instead use the mapred command for it." 1>&2
echo "" 1>&2
#try to locate mapred and if present, delegate to it.
if [ -f "${HADOOP_MAPRED_HOME}"/bin/mapred ]; then
exec "${HADOOP_MAPRED_HOME}"/bin/mapred $*
elif [ -f "${HADOOP_PREFIX}"/bin/mapred ]; then
exec "${HADOOP_PREFIX}"/bin/mapred $*
else
echo "HADOOP_MAPRED_HOME not found!"
exit 1
fi
;;

View File

@ -193,6 +193,9 @@ Release 0.23.1 - UNRELEASED
not use ArrayWritable for writing non-array items. (Uma Maheswara Rao G
via szetszwo)
HDFS-2803. Add logging to LeaseRenewer for better lease expiration debugging.
(Jimmy Xiang via todd)
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)

View File

@ -292,6 +292,10 @@ class LeaseRenewer {
@Override
public void run() {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " started");
}
LeaseRenewer.this.run(id);
} catch(InterruptedException e) {
if (LOG.isDebugEnabled()) {
@ -302,6 +306,10 @@ class LeaseRenewer {
synchronized(LeaseRenewer.this) {
Factory.INSTANCE.remove(LeaseRenewer.this);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " exited");
}
}
}
@ -401,6 +409,9 @@ class LeaseRenewer {
if (!c.getClientName().equals(previousName)) {
c.renewLease();
previousName = c.getClientName();
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewed for client " + previousName);
}
}
}
}
@ -416,6 +427,10 @@ class LeaseRenewer {
if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) {
try {
renew();
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " executed");
}
lastRenewed = System.currentTimeMillis();
} catch (SocketTimeoutException ie) {
LOG.warn("Failed to renew lease for " + clientsString() + " for "
@ -435,6 +450,15 @@ class LeaseRenewer {
synchronized(this) {
if (id != currentId || isRenewerExpired()) {
if (LOG.isDebugEnabled()) {
if (id != currentId) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " is not current");
} else {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " expired");
}
}
//no longer the current daemon or expired
return;
}

View File

@ -135,6 +135,8 @@ Release 0.23.1 - Unreleased
assign only one off-switch container in a single scheduling
iteration. (Arun C Murthy via vinodkv)
MAPREDUCE-3692. yarn-resourcemanager out and log files can get big. (eli)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -465,6 +467,15 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe
via mahadev)
MAPREDUCE-2450. Fixed a corner case with interrupted communication threads
leading to a long timeout in Task. (Rajesh Balamohan via acmurthy)
MAPREDUCE-3669. Allow clients to talk to MR HistoryServer using both
delegation tokens and kerberos. (mahadev via acmurthy)
MAPREDUCE-3684. LocalDistributedCacheManager does not shut down its thread
pool (tomwhite)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -632,7 +632,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
* The only entry point to change the Job.
*/
public void handle(JobEvent event) {
LOG.info("Processing " + event.getJobId() + " of type " + event.getType());
LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
try {
writeLock.lock();
JobState oldState = getState();

View File

@ -537,7 +537,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
LOG.info("Processing " + event.getTaskID() + " of type " + event.getType());
LOG.debug("Processing " + event.getTaskID() + " of type " + event.getType());
try {
writeLock.lock();
TaskState oldState = getState();

View File

@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -57,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A helper class for managing the distributed cache for {@link LocalJobRunner}.
*/
@ -111,43 +114,52 @@ class LocalDistributedCacheManager {
FileContext localFSFileContext = FileContext.getLocalFSFileContext();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
ExecutorService exec = Executors.newCachedThreadPool();
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
for (LocalResource resource : localResources.values()) {
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
destPath, resource, new Random());
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}
for (LocalResource resource : localResources.values()) {
Path path;
try {
path = resourcesToPaths.get(resource).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e);
ExecutorService exec = null;
try {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalDistributedCacheManager Downloader #%d")
.build();
exec = Executors.newCachedThreadPool(tf);
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
for (LocalResource resource : localResources.values()) {
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
destPath, resource, new Random());
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}
String pathString = path.toUri().toString();
if (resource.getType() == LocalResourceType.ARCHIVE) {
localArchives.add(pathString);
} else if (resource.getType() == LocalResourceType.FILE) {
localFiles.add(pathString);
for (LocalResource resource : localResources.values()) {
Path path;
try {
path = resourcesToPaths.get(resource).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e);
}
String pathString = path.toUri().toString();
if (resource.getType() == LocalResourceType.ARCHIVE) {
localArchives.add(pathString);
} else if (resource.getType() == LocalResourceType.FILE) {
localFiles.add(pathString);
}
Path resourcePath;
try {
resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
} catch (URISyntaxException e) {
throw new IOException(e);
}
LOG.info(String.format("Localized %s as %s", resourcePath, path));
String cp = resourcePath.toUri().getPath();
if (classpaths.keySet().contains(cp)) {
localClasspaths.add(path.toUri().getPath().toString());
}
}
Path resourcePath;
try {
resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
} catch (URISyntaxException e) {
throw new IOException(e);
} finally {
if (exec != null) {
exec.shutdown();
}
LOG.info(String.format("Localized %s as %s", resourcePath, path));
String cp = resourcePath.toUri().getPath();
if (classpaths.keySet().contains(cp)) {
localClasspaths.add(path.toUri().getPath().toString());
}
}
}
// Update the configuration object with localized data.
if (!localArchives.isEmpty()) {
conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
@ -171,7 +183,7 @@ class LocalDistributedCacheManager {
}
setupCalled = true;
}
/**
* Are the resources that should be added to the classpath?
* Should be called after setup().

View File

@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -61,6 +62,8 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private
@InterfaceStability.Unstable
@ -302,7 +305,10 @@ public class LocalJobRunner implements ClientProtocol {
LOG.debug("Map tasks to process: " + this.numMapTasks);
// Create a new executor service to drain the work queue.
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalJobRunner Map Task Executor #%d")
.build();
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
return executor;
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api;
public interface HSClientProtocol extends MRClientProtocol {
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.impl.pb.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
public class HSClientProtocolPBClientImpl extends MRClientProtocolPBClientImpl
implements HSClientProtocol {
public HSClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
super(clientVersion, addr, conf);
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.impl.pb.service;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.yarn.proto.HSClientProtocol.HSClientProtocolService.BlockingInterface;
public class HSClientProtocolPBServiceImpl extends MRClientProtocolPBServiceImpl
implements BlockingInterface {
public HSClientProtocolPBServiceImpl(HSClientProtocol impl) {
super(impl);
}
}

View File

@ -27,14 +27,14 @@ import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.MRClientProtocol;
import org.apache.hadoop.yarn.proto.HSClientProtocol;
public class ClientHSSecurityInfo extends SecurityInfo {
@Override
public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
if (!protocol
.equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) {
.equals(HSClientProtocol.HSClientProtocolService.BlockingInterface.class)) {
return null;
}
return new KerberosInfo() {
@ -59,7 +59,7 @@ public class ClientHSSecurityInfo extends SecurityInfo {
@Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol
.equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) {
.equals(HSClientProtocol.HSClientProtocolService.BlockingInterface.class)) {
return null;
}
return new TokenInfo() {

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.proto;
/**
* Fake protocol to differentiate the blocking interfaces in the
* security info class loaders.
*/
public interface HSClientProtocol {
public abstract class HSClientProtocolService {
public interface BlockingInterface extends
MRClientProtocol.MRClientProtocolService.BlockingInterface {
}
}
}

View File

@ -22,6 +22,7 @@ option java_generic_services = true;
import "mr_service_protos.proto";
/* If making changes to this, please edit HSClientProtocolService */
service MRClientProtocolService {
rpc getJobReport (GetJobReportRequestProto) returns (GetJobReportResponseProto);
rpc getTaskReport (GetTaskReportRequestProto) returns (GetTaskReportResponseProto);

View File

@ -552,6 +552,8 @@ abstract public class Task implements Writable, Configurable {
private InputSplit split = null;
private Progress taskProgress;
private Thread pingThread = null;
private boolean done = true;
private Object lock = new Object();
/**
* flag that indicates whether progress update needs to be sent to parent.
@ -648,6 +650,9 @@ abstract public class Task implements Writable, Configurable {
// get current flag value and reset it as well
boolean sendProgress = resetProgressFlag();
while (!taskDone.get()) {
synchronized (lock) {
done = false;
}
try {
boolean taskFound = true; // whether TT knows about this task
// sleep for a bit
@ -680,6 +685,7 @@ abstract public class Task implements Writable, Configurable {
// came back up), kill ourselves
if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId);
resetDoneFlag();
System.exit(66);
}
@ -692,10 +698,19 @@ abstract public class Task implements Writable, Configurable {
if (remainingRetries == 0) {
ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
LOG.warn("Last retry, killing "+taskId);
resetDoneFlag();
System.exit(65);
}
}
}
//Notify that we are done with the work
resetDoneFlag();
}
void resetDoneFlag() {
synchronized (lock) {
done = true;
lock.notify();
}
}
public void startCommunicationThread() {
if (pingThread == null) {
@ -706,6 +721,11 @@ abstract public class Task implements Writable, Configurable {
}
public void stopCommunicationThread() throws InterruptedException {
if (pingThread != null) {
synchronized (lock) {
while (!done) {
lock.wait();
}
}
pingThread.interrupt();
pingThread.join();
}

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.net.NetUtils;
@ -91,7 +92,7 @@ public class ClientCache {
return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), conf);
}
});

View File

@ -315,11 +315,10 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
ProtoSpecificRequestWritable request = (ProtoSpecificRequestWritable) writableRequest;
ProtoSpecificRpcRequest rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();
System.out.println("Call: protocol=" + protocol + ", method="
+ methodName);
if (verbose)
if (verbose) {
log("Call: protocol=" + protocol + ", method="
+ methodName);
}
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
if (methodDescriptor == null) {

View File

@ -373,7 +373,7 @@ public class ApplicationImpl implements Application {
try {
ApplicationId applicationID = event.getApplicationID();
LOG.info("Processing " + applicationID + " of type " + event.getType());
LOG.debug("Processing " + applicationID + " of type " + event.getType());
ApplicationState oldState = stateMachine.getCurrentState();
ApplicationState newState = null;

View File

@ -811,7 +811,7 @@ public class ContainerImpl implements Container {
this.writeLock.lock();
ContainerId containerID = event.getContainerID();
LOG.info("Processing " + containerID + " of type " + event.getType());
LOG.debug("Processing " + containerID + " of type " + event.getType());
ContainerState oldState = stateMachine.getCurrentState();
ContainerState newState = null;

View File

@ -181,7 +181,7 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
this.writeLock.lock();
Path resourcePath = event.getLocalResourceRequest().getPath();
LOG.info("Processing " + resourcePath + " of type " + event.getType());
LOG.debug("Processing " + resourcePath + " of type " + event.getType());
ResourceState oldState = this.stateMachine.getCurrentState();
ResourceState newState = null;

View File

@ -413,7 +413,7 @@ public class RMAppImpl implements RMApp {
try {
ApplicationId appID = event.getApplicationId();
LOG.info("Processing event for " + appID + " of type "
LOG.debug("Processing event for " + appID + " of type "
+ event.getType());
final RMAppState oldState = getState();
try {

View File

@ -468,7 +468,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
try {
ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
LOG.info("Processing event for " + appAttemptID + " of type "
LOG.debug("Processing event for " + appAttemptID + " of type "
+ event.getType());
final RMAppAttemptState oldState = getAppAttemptState();
try {

View File

@ -192,7 +192,7 @@ public class RMContainerImpl implements RMContainer {
@Override
public void handle(RMContainerEvent event) {
LOG.info("Processing " + event.getContainerId() + " of type " + event.getType());
LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
try {
writeLock.lock();
RMContainerState oldState = getState();

View File

@ -283,7 +283,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
public void handle(RMNodeEvent event) {
LOG.info("Processing " + event.getNodeId() + " of type " + event.getType());
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
try {
writeLock.lock();
RMNodeState oldState = getState();

View File

@ -575,12 +575,12 @@ public class FifoScheduler implements ResourceScheduler {
if (Resources.greaterThanOrEqual(node.getAvailableResource(),
minimumAllocation)) {
LOG.info("Node heartbeat " + rmNode.getNodeID() +
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getAvailableResource());
assignContainers(node);
LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}