MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the number of nodes blacklisted crosses a threshold. Contributed by Siddharth Seth.

svn merge -c 1221523 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1221524 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-12-20 23:27:57 +00:00
parent 30dea757e1
commit cc6328635b
13 changed files with 293 additions and 18 deletions

View File

@ -294,6 +294,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using
old MR api. (Subroto Sanyal via acmurthy)
MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the
number of nodes blacklisted crosses a threshold. (Siddharth Seth via vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -68,6 +68,7 @@ public abstract class RMCommunicator extends AbstractService {
protected ApplicationAttemptId applicationAttemptId;
private AtomicBoolean stopped;
protected Thread allocatorThread;
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
protected AMRMProtocol scheduler;
private final ClientService clientService;

View File

@ -479,12 +479,16 @@ public class RMContainerAllocator extends RMContainerRequestor
//something changed
recalculateReduceSchedule = true;
}
List<Container> allocatedContainers = new ArrayList<Container>();
for (Container cont : newContainers) {
allocatedContainers.add(cont);
if (LOG.isDebugEnabled()) {
for (Container cont : newContainers) {
LOG.debug("Received new Container :" + cont);
}
}
//Called on each allocation. Will know about newly blacklisted/added hosts.
computeIgnoreBlacklisting();
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont);
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());

View File

@ -18,15 +18,15 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* Keeps the data structures to send container requests to RM.
*/
@ -74,9 +76,15 @@ public abstract class RMContainerRequestor extends RMCommunicator {
private final Set<ContainerId> release = new TreeSet<ContainerId>();
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
private int blacklistedNodeCount = 0;
private int lastClusterNmCount = 0;
private int clusterNmCount = 0;
private int maxTaskFailuresPerNode;
private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
private final Set<String> blacklistedNodes = new HashSet<String>();
private final Set<String> blacklistedNodes = Collections
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public RMContainerRequestor(ClientService clientService, AppContext context) {
super(clientService, context);
@ -122,7 +130,17 @@ public abstract class RMContainerRequestor extends RMCommunicator {
LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
maxTaskFailuresPerNode =
conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
blacklistDisablePercent =
conf.getInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
throw new YarnException("Invalid blacklistDisablePercent: "
+ blacklistDisablePercent
+ ". Should be an integer between 0 and 100 or -1 to disabled");
}
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
}
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
@ -134,19 +152,49 @@ public abstract class RMContainerRequestor extends RMCommunicator {
AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId();
availableResources = response.getAvailableResources();
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() +
" newContainers=" + response.getAllocatedContainers().size() +
" finishedContainers=" +
response.getCompletedContainersStatuses().size() +
" resourcelimit=" + availableResources);
" resourcelimit=" + availableResources +
"knownNMs=" + clusterNmCount);
ask.clear();
release.clear();
return response;
}
// May be incorrect if there's multiple NodeManagers running on a single host.
// knownNodeCount is based on node managers, not hosts. blacklisting is
// currently based on hosts.
protected void computeIgnoreBlacklisting() {
if (blacklistDisablePercent != -1
&& (blacklistedNodeCount != blacklistedNodes.size() ||
clusterNmCount != lastClusterNmCount)) {
blacklistedNodeCount = blacklistedNodes.size();
if (clusterNmCount == 0) {
LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
return;
}
int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100);
if (val >= blacklistDisablePercent) {
if (ignoreBlacklisting.compareAndSet(false, true)) {
LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
}
} else {
if (ignoreBlacklisting.compareAndSet(true, false)) {
LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
}
}
}
}
protected void containerFailedOnHost(String hostName) {
if (!nodeBlacklistingEnabled) {
return;
@ -161,8 +209,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
LOG.info(failures + " failures on node " + hostName);
if (failures >= maxTaskFailuresPerNode) {
blacklistedNodes.add(hostName);
//Even if blacklisting is ignored, continue to remove the host from
// the request table. The RM may have additional nodes it can allocate on.
LOG.info("Blacklisted host " + hostName);
//remove all the requests corresponding to this hostname
for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
: remoteRequestsTable.values()){
@ -316,7 +366,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
}
protected boolean isNodeBlacklisted(String hostname) {
if (!nodeBlacklistingEnabled) {
if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
return false;
}
return blacklistedNodes.contains(hostname);

View File

@ -488,6 +488,8 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
@ -580,6 +582,175 @@ public class TestRMContainerAllocator {
}
}
@Test
public void testIgnoreBlacklisting() throws Exception {
LOG.info("Running testIgnoreBlacklisting");
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM[] nodeManagers = new MockNM[10];
int nmNum = 0;
List<TaskAttemptContainerAssignedEvent> assigned = null;
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
nodeManagers[0].nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator =
new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
// Known=1, blacklisted=0, ignore should be false - assign first container
assigned =
getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
+ " ignore blacklisting enabled");
// Send events to blacklist nodes h1 and h2
ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
allocator.sendFailure(f1);
// Test single node.
// Known=1, blacklisted=1, ignore should be true - assign 1
assigned =
getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
// Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
assigned =
getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
nodeManagers[1], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
assigned =
getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
nodeManagers[2], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Known=3, blacklisted=1, ignore should be true - assign 1
assigned =
getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway
assigned =
getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
nodeManagers[3], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Test blacklisting re-enabled.
// Known=4, blacklisted=1, ignore should be false - no assignment on h1
assigned =
getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// RMContainerRequestor would have created a replacement request.
// Blacklist h2
ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false);
allocator.sendFailure(f2);
// Test ignore blacklisting re-enabled
// Known=4, blacklisted=2, ignore should be true. Should assign 2
// containers.
assigned =
getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
// Known=4, blacklisted=2, ignore should be true.
assigned =
getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
nodeManagers[1], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Test blacklist while ignore blacklisting enabled
ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
allocator.sendFailure(f3);
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
// Known=5, blacklisted=3, ignore should be true.
assigned =
getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
nodeManagers[2], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Assign on 5 more nodes - to re-enable blacklisting
for (int i = 0; i < 5; i++) {
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
assigned =
getContainerOnHost(jobId, 11 + i, 1024,
new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
}
// Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
assigned =
getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
nodeManagers[2], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
}
private MockNM registerNodeManager(int i, MyResourceManager rm,
DrainDispatcher dispatcher) throws Exception {
MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
dispatcher.await();
return nm;
}
private
List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
DrainDispatcher dispatcher, MyContainerAllocator allocator)
throws Exception {
ContainerRequestEvent reqEvent =
createReq(jobId, taskAttemptId, memory, hosts);
allocator.sendRequest(reqEvent);
// Send the request to the RM
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Heartbeat from the required nodeManager
mockNM.nodeHeartbeat(true);
dispatcher.await();
assigned = allocator.schedule();
dispatcher.await();
return assigned;
}
@Test
public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
@ -587,6 +758,8 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();

View File

@ -348,8 +348,14 @@ public interface MRJobConfig {
/** Enable blacklisting of nodes in the job.*/
public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE =
MR_AM_PREFIX + "job.node.blacklisting.enable";
MR_AM_PREFIX + "job.node-blacklisting.enable";
/** Ignore blacklisting if a certain percentage of nodes have been blacklisted */
public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT =
MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent";
public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT =
33;
/** Enable job recovery.*/
public static final String MR_AM_JOB_RECOVERY_ENABLE =
MR_AM_PREFIX + "job.recovery.enable";

View File

@ -61,4 +61,17 @@ public interface AllocateResponse {
@Private
@Unstable
public abstract void setAMResponse(AMResponse amResponse);
/**
* Get the number of hosts available on the cluster.
* @return the available host count.
*/
@Public
@Stable
public int getNumClusterNodes();
@Private
@Unstable
public void setNumClusterNodes(int numNodes);
}

View File

@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBui
public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> implements AllocateResponse {
public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
implements AllocateResponse {
AllocateResponseProto proto = AllocateResponseProto.getDefaultInstance();
AllocateResponseProto.Builder builder = null;
boolean viaProto = false;
@ -95,7 +96,20 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> imp
builder.clearAMResponse();
this.amResponse = aMResponse;
}
@Override
public int getNumClusterNodes() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getNumClusterNodes();
}
@Override
public void setNumClusterNodes(int numNodes) {
maybeInitBuilder();
builder.setNumClusterNodes(numNodes);
}
private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) {
return new AMResponsePBImpl(p);
}
@ -103,7 +117,4 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> imp
private AMResponseProto convertToProtoFormat(AMResponse t) {
return ((AMResponsePBImpl)t).getProto();
}
}

View File

@ -59,6 +59,7 @@ message AllocateRequestProto {
message AllocateResponseProto {
optional AMResponseProto AM_response = 1;
optional int32 num_cluster_nodes = 2;
}

View File

@ -285,6 +285,7 @@ public class ApplicationMasterService extends AbstractService implements
response.setAvailableResources(allocation.getResourceLimit());
responseMap.put(appAttemptId, response);
allocateResponse.setAMResponse(response);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
return allocateResponse;
}
}

View File

@ -79,6 +79,14 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
@Stable
public Resource getMaximumResourceCapability();
/**
* Get the number of nodes available in the cluster.
* @return the number of available nodes.
*/
@Public
@Stable
public int getNumClusterNodes();
/**
* The main api between the ApplicationMaster and the Scheduler.
* The ApplicationMaster is updating his future resource requirements

View File

@ -159,6 +159,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
return maximumAllocation;
}
@Override
public synchronized int getNumClusterNodes() {
return numNodeManagers;
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -36,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Lock;
@ -179,6 +177,11 @@ public class FifoScheduler implements ResourceScheduler {
return minimumAllocation;
}
@Override
public int getNumClusterNodes() {
return nodes.size();
}
@Override
public Resource getMaximumResourceCapability() {
return maximumAllocation;