MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the number of nodes blacklisted crosses a threshold. Contributed by Siddharth Seth.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1221523 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4e1d5a0d71
commit
e7543b944c
|
@ -343,6 +343,9 @@ Release 0.23.1 - Unreleased
|
|||
MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using
|
||||
old MR api. (Subroto Sanyal via acmurthy)
|
||||
|
||||
MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the
|
||||
number of nodes blacklisted crosses a threshold. (Siddharth Seth via vinodkv)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -68,6 +68,7 @@ public abstract class RMCommunicator extends AbstractService {
|
|||
protected ApplicationAttemptId applicationAttemptId;
|
||||
private AtomicBoolean stopped;
|
||||
protected Thread allocatorThread;
|
||||
@SuppressWarnings("rawtypes")
|
||||
protected EventHandler eventHandler;
|
||||
protected AMRMProtocol scheduler;
|
||||
private final ClientService clientService;
|
||||
|
|
|
@ -479,12 +479,16 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
//something changed
|
||||
recalculateReduceSchedule = true;
|
||||
}
|
||||
|
||||
List<Container> allocatedContainers = new ArrayList<Container>();
|
||||
for (Container cont : newContainers) {
|
||||
allocatedContainers.add(cont);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (Container cont : newContainers) {
|
||||
LOG.debug("Received new Container :" + cont);
|
||||
}
|
||||
}
|
||||
|
||||
//Called on each allocation. Will know about newly blacklisted/added hosts.
|
||||
computeIgnoreBlacklisting();
|
||||
|
||||
for (ContainerStatus cont : finishedContainers) {
|
||||
LOG.info("Received completed container " + cont);
|
||||
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
|
||||
|
|
|
@ -18,15 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
|
||||
/**
|
||||
* Keeps the data structures to send container requests to RM.
|
||||
*/
|
||||
|
@ -74,9 +76,15 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
private final Set<ContainerId> release = new TreeSet<ContainerId>();
|
||||
|
||||
private boolean nodeBlacklistingEnabled;
|
||||
private int blacklistDisablePercent;
|
||||
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
|
||||
private int blacklistedNodeCount = 0;
|
||||
private int lastClusterNmCount = 0;
|
||||
private int clusterNmCount = 0;
|
||||
private int maxTaskFailuresPerNode;
|
||||
private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
|
||||
private final Set<String> blacklistedNodes = new HashSet<String>();
|
||||
private final Set<String> blacklistedNodes = Collections
|
||||
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||
|
||||
public RMContainerRequestor(ClientService clientService, AppContext context) {
|
||||
super(clientService, context);
|
||||
|
@ -122,7 +130,17 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
|
||||
maxTaskFailuresPerNode =
|
||||
conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
|
||||
blacklistDisablePercent =
|
||||
conf.getInt(
|
||||
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
|
||||
MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
|
||||
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
|
||||
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
|
||||
throw new YarnException("Invalid blacklistDisablePercent: "
|
||||
+ blacklistDisablePercent
|
||||
+ ". Should be an integer between 0 and 100 or -1 to disabled");
|
||||
}
|
||||
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
|
||||
}
|
||||
|
||||
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
|
||||
|
@ -134,19 +152,49 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
AMResponse response = allocateResponse.getAMResponse();
|
||||
lastResponseID = response.getResponseId();
|
||||
availableResources = response.getAvailableResources();
|
||||
lastClusterNmCount = clusterNmCount;
|
||||
clusterNmCount = allocateResponse.getNumClusterNodes();
|
||||
|
||||
LOG.info("getResources() for " + applicationId + ":" + " ask="
|
||||
+ ask.size() + " release= " + release.size() +
|
||||
" newContainers=" + response.getAllocatedContainers().size() +
|
||||
" finishedContainers=" +
|
||||
response.getCompletedContainersStatuses().size() +
|
||||
" resourcelimit=" + availableResources);
|
||||
" resourcelimit=" + availableResources +
|
||||
"knownNMs=" + clusterNmCount);
|
||||
|
||||
ask.clear();
|
||||
release.clear();
|
||||
return response;
|
||||
}
|
||||
|
||||
// May be incorrect if there's multiple NodeManagers running on a single host.
|
||||
// knownNodeCount is based on node managers, not hosts. blacklisting is
|
||||
// currently based on hosts.
|
||||
protected void computeIgnoreBlacklisting() {
|
||||
if (blacklistDisablePercent != -1
|
||||
&& (blacklistedNodeCount != blacklistedNodes.size() ||
|
||||
clusterNmCount != lastClusterNmCount)) {
|
||||
blacklistedNodeCount = blacklistedNodes.size();
|
||||
if (clusterNmCount == 0) {
|
||||
LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
|
||||
return;
|
||||
}
|
||||
int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100);
|
||||
if (val >= blacklistDisablePercent) {
|
||||
if (ignoreBlacklisting.compareAndSet(false, true)) {
|
||||
LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
|
||||
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
|
||||
}
|
||||
} else {
|
||||
if (ignoreBlacklisting.compareAndSet(true, false)) {
|
||||
LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
|
||||
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void containerFailedOnHost(String hostName) {
|
||||
if (!nodeBlacklistingEnabled) {
|
||||
return;
|
||||
|
@ -161,8 +209,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
LOG.info(failures + " failures on node " + hostName);
|
||||
if (failures >= maxTaskFailuresPerNode) {
|
||||
blacklistedNodes.add(hostName);
|
||||
//Even if blacklisting is ignored, continue to remove the host from
|
||||
// the request table. The RM may have additional nodes it can allocate on.
|
||||
LOG.info("Blacklisted host " + hostName);
|
||||
|
||||
|
||||
//remove all the requests corresponding to this hostname
|
||||
for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
|
||||
: remoteRequestsTable.values()){
|
||||
|
@ -316,7 +366,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
}
|
||||
|
||||
protected boolean isNodeBlacklisted(String hostname) {
|
||||
if (!nodeBlacklistingEnabled) {
|
||||
if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
|
||||
return false;
|
||||
}
|
||||
return blacklistedNodes.contains(hostname);
|
||||
|
|
|
@ -488,6 +488,8 @@ public class TestRMContainerAllocator {
|
|||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
|
||||
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
|
||||
conf.setInt(
|
||||
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
|
||||
|
||||
MyResourceManager rm = new MyResourceManager(conf);
|
||||
rm.start();
|
||||
|
@ -580,6 +582,175 @@ public class TestRMContainerAllocator {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreBlacklisting() throws Exception {
|
||||
LOG.info("Running testIgnoreBlacklisting");
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
|
||||
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
|
||||
conf.setInt(
|
||||
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
|
||||
|
||||
MyResourceManager rm = new MyResourceManager(conf);
|
||||
rm.start();
|
||||
DrainDispatcher dispatcher =
|
||||
(DrainDispatcher) rm.getRMContext().getDispatcher();
|
||||
|
||||
// Submit the application
|
||||
RMApp app = rm.submitApp(1024);
|
||||
dispatcher.await();
|
||||
|
||||
MockNM[] nodeManagers = new MockNM[10];
|
||||
int nmNum = 0;
|
||||
List<TaskAttemptContainerAssignedEvent> assigned = null;
|
||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||
nodeManagers[0].nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
ApplicationAttemptId appAttemptId =
|
||||
app.getCurrentAppAttempt().getAppAttemptId();
|
||||
rm.sendAMLaunched(appAttemptId);
|
||||
dispatcher.await();
|
||||
|
||||
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getReport()).thenReturn(
|
||||
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||
0, 0, 0, 0, 0, 0, "jobfile", null, false));
|
||||
MyContainerAllocator allocator =
|
||||
new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
|
||||
|
||||
// Known=1, blacklisted=0, ignore should be false - assign first container
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
|
||||
nodeManagers[0], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
|
||||
LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
|
||||
+ " ignore blacklisting enabled");
|
||||
// Send events to blacklist nodes h1 and h2
|
||||
ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
|
||||
allocator.sendFailure(f1);
|
||||
|
||||
// Test single node.
|
||||
// Known=1, blacklisted=1, ignore should be true - assign 1
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
|
||||
nodeManagers[0], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
|
||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||
// Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
|
||||
nodeManagers[1], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
|
||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
|
||||
nodeManagers[2], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
|
||||
// Known=3, blacklisted=1, ignore should be true - assign 1
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
|
||||
nodeManagers[0], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
|
||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
|
||||
nodeManagers[3], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
|
||||
// Test blacklisting re-enabled.
|
||||
// Known=4, blacklisted=1, ignore should be false - no assignment on h1
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
|
||||
nodeManagers[0], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||
// RMContainerRequestor would have created a replacement request.
|
||||
|
||||
// Blacklist h2
|
||||
ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false);
|
||||
allocator.sendFailure(f2);
|
||||
|
||||
// Test ignore blacklisting re-enabled
|
||||
// Known=4, blacklisted=2, ignore should be true. Should assign 2
|
||||
// containers.
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
|
||||
nodeManagers[0], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
|
||||
|
||||
// Known=4, blacklisted=2, ignore should be true.
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
|
||||
nodeManagers[1], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
|
||||
// Test blacklist while ignore blacklisting enabled
|
||||
ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
|
||||
allocator.sendFailure(f3);
|
||||
|
||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||
// Known=5, blacklisted=3, ignore should be true.
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
|
||||
nodeManagers[2], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
|
||||
// Assign on 5 more nodes - to re-enable blacklisting
|
||||
for (int i = 0; i < 5; i++) {
|
||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 11 + i, 1024,
|
||||
new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
|
||||
dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||
}
|
||||
|
||||
// Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
|
||||
assigned =
|
||||
getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
|
||||
nodeManagers[2], dispatcher, allocator);
|
||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||
}
|
||||
|
||||
private MockNM registerNodeManager(int i, MyResourceManager rm,
|
||||
DrainDispatcher dispatcher) throws Exception {
|
||||
MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
|
||||
dispatcher.await();
|
||||
return nm;
|
||||
}
|
||||
|
||||
private
|
||||
List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
|
||||
int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
|
||||
DrainDispatcher dispatcher, MyContainerAllocator allocator)
|
||||
throws Exception {
|
||||
ContainerRequestEvent reqEvent =
|
||||
createReq(jobId, taskAttemptId, memory, hosts);
|
||||
allocator.sendRequest(reqEvent);
|
||||
|
||||
// Send the request to the RM
|
||||
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
|
||||
dispatcher.await();
|
||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||
|
||||
// Heartbeat from the required nodeManager
|
||||
mockNM.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
assigned = allocator.schedule();
|
||||
dispatcher.await();
|
||||
return assigned;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
|
||||
LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
|
||||
|
@ -587,6 +758,8 @@ public class TestRMContainerAllocator {
|
|||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
|
||||
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
|
||||
conf.setInt(
|
||||
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
|
||||
|
||||
MyResourceManager rm = new MyResourceManager(conf);
|
||||
rm.start();
|
||||
|
|
|
@ -348,8 +348,14 @@ public interface MRJobConfig {
|
|||
|
||||
/** Enable blacklisting of nodes in the job.*/
|
||||
public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE =
|
||||
MR_AM_PREFIX + "job.node.blacklisting.enable";
|
||||
MR_AM_PREFIX + "job.node-blacklisting.enable";
|
||||
|
||||
/** Ignore blacklisting if a certain percentage of nodes have been blacklisted */
|
||||
public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT =
|
||||
MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent";
|
||||
public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT =
|
||||
33;
|
||||
|
||||
/** Enable job recovery.*/
|
||||
public static final String MR_AM_JOB_RECOVERY_ENABLE =
|
||||
MR_AM_PREFIX + "job.recovery.enable";
|
||||
|
|
|
@ -61,4 +61,17 @@ public interface AllocateResponse {
|
|||
@Private
|
||||
@Unstable
|
||||
public abstract void setAMResponse(AMResponse amResponse);
|
||||
|
||||
|
||||
/**
|
||||
* Get the number of hosts available on the cluster.
|
||||
* @return the available host count.
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public int getNumClusterNodes();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setNumClusterNodes(int numNodes);
|
||||
}
|
||||
|
|
|
@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBui
|
|||
|
||||
|
||||
|
||||
public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> implements AllocateResponse {
|
||||
public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
|
||||
implements AllocateResponse {
|
||||
AllocateResponseProto proto = AllocateResponseProto.getDefaultInstance();
|
||||
AllocateResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
@ -95,7 +96,20 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> imp
|
|||
builder.clearAMResponse();
|
||||
this.amResponse = aMResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumClusterNodes() {
|
||||
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getNumClusterNodes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNumClusterNodes(int numNodes) {
|
||||
maybeInitBuilder();
|
||||
builder.setNumClusterNodes(numNodes);
|
||||
}
|
||||
|
||||
|
||||
private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) {
|
||||
return new AMResponsePBImpl(p);
|
||||
}
|
||||
|
@ -103,7 +117,4 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> imp
|
|||
private AMResponseProto convertToProtoFormat(AMResponse t) {
|
||||
return ((AMResponsePBImpl)t).getProto();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ message AllocateRequestProto {
|
|||
|
||||
message AllocateResponseProto {
|
||||
optional AMResponseProto AM_response = 1;
|
||||
optional int32 num_cluster_nodes = 2;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -285,6 +285,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
response.setAvailableResources(allocation.getResourceLimit());
|
||||
responseMap.put(appAttemptId, response);
|
||||
allocateResponse.setAMResponse(response);
|
||||
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
|
||||
return allocateResponse;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,6 +79,14 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
@Stable
|
||||
public Resource getMaximumResourceCapability();
|
||||
|
||||
/**
|
||||
* Get the number of nodes available in the cluster.
|
||||
* @return the number of available nodes.
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public int getNumClusterNodes();
|
||||
|
||||
/**
|
||||
* The main api between the ApplicationMaster and the Scheduler.
|
||||
* The ApplicationMaster is updating his future resource requirements
|
||||
|
|
|
@ -159,6 +159,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
return maximumAllocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getNumClusterNodes() {
|
||||
return numNodeManagers;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -36,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.Lock;
|
||||
|
@ -179,6 +177,11 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
return minimumAllocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumClusterNodes() {
|
||||
return nodes.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getMaximumResourceCapability() {
|
||||
return maximumAllocation;
|
||||
|
|
Loading…
Reference in New Issue