From cc6328635b70d3bfa40e659733ff08691d2a6a26 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 20 Dec 2011 23:27:57 +0000 Subject: [PATCH] MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the number of nodes blacklisted crosses a threshold. Contributed by Siddharth Seth. svn merge -c 1221523 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1221524 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/rm/RMCommunicator.java | 1 + .../v2/app/rm/RMContainerAllocator.java | 12 +- .../v2/app/rm/RMContainerRequestor.java | 64 ++++++- .../v2/app/TestRMContainerAllocator.java | 173 ++++++++++++++++++ .../apache/hadoop/mapreduce/MRJobConfig.java | 8 +- .../api/protocolrecords/AllocateResponse.java | 13 ++ .../impl/pb/AllocateResponsePBImpl.java | 19 +- .../src/main/proto/yarn_service_protos.proto | 1 + .../ApplicationMasterService.java | 1 + .../scheduler/YarnScheduler.java | 8 + .../scheduler/capacity/CapacityScheduler.java | 1 + .../scheduler/fifo/FifoScheduler.java | 7 +- 13 files changed, 293 insertions(+), 18 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ce67fd51c11..f8dea3ef2b0 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -294,6 +294,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using old MR api. (Subroto Sanyal via acmurthy) + MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the + number of nodes blacklisted crosses a threshold. (Siddharth Seth via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 5028355acf3..4281e0a4842 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -68,6 +68,7 @@ public abstract class RMCommunicator extends AbstractService { protected ApplicationAttemptId applicationAttemptId; private AtomicBoolean stopped; protected Thread allocatorThread; + @SuppressWarnings("rawtypes") protected EventHandler eventHandler; protected AMRMProtocol scheduler; private final ClientService clientService; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 81a5a75b503..d55dc2981f7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -479,12 +479,16 @@ public class RMContainerAllocator extends RMContainerRequestor //something changed recalculateReduceSchedule = true; } - - List allocatedContainers = new ArrayList(); - for (Container cont : newContainers) { - allocatedContainers.add(cont); + + if (LOG.isDebugEnabled()) { + for (Container cont : newContainers) { LOG.debug("Received new Container :" + cont); + } } + + //Called on each allocation. Will know about newly blacklisted/added hosts. + computeIgnoreBlacklisting(); + for (ContainerStatus cont : finishedContainers) { LOG.info("Received completed container " + cont); TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 6c03c6690cf..2f25075ee84 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -18,15 +18,15 @@ package org.apache.hadoop.mapreduce.v2.app.rm; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.AMResponse; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.BuilderUtils; + /** * Keeps the data structures to send container requests to RM. */ @@ -74,9 +76,15 @@ public abstract class RMContainerRequestor extends RMCommunicator { private final Set release = new TreeSet(); private boolean nodeBlacklistingEnabled; + private int blacklistDisablePercent; + private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false); + private int blacklistedNodeCount = 0; + private int lastClusterNmCount = 0; + private int clusterNmCount = 0; private int maxTaskFailuresPerNode; private final Map nodeFailures = new HashMap(); - private final Set blacklistedNodes = new HashSet(); + private final Set blacklistedNodes = Collections + .newSetFromMap(new ConcurrentHashMap()); public RMContainerRequestor(ClientService clientService, AppContext context) { super(clientService, context); @@ -122,7 +130,17 @@ public abstract class RMContainerRequestor extends RMCommunicator { LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled); maxTaskFailuresPerNode = conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3); + blacklistDisablePercent = + conf.getInt( + MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, + MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT); LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode); + if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) { + throw new YarnException("Invalid blacklistDisablePercent: " + + blacklistDisablePercent + + ". Should be an integer between 0 and 100 or -1 to disabled"); + } + LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); } protected AMResponse makeRemoteRequest() throws YarnRemoteException { @@ -134,19 +152,49 @@ public abstract class RMContainerRequestor extends RMCommunicator { AMResponse response = allocateResponse.getAMResponse(); lastResponseID = response.getResponseId(); availableResources = response.getAvailableResources(); + lastClusterNmCount = clusterNmCount; + clusterNmCount = allocateResponse.getNumClusterNodes(); LOG.info("getResources() for " + applicationId + ":" + " ask=" + ask.size() + " release= " + release.size() + " newContainers=" + response.getAllocatedContainers().size() + " finishedContainers=" + response.getCompletedContainersStatuses().size() + - " resourcelimit=" + availableResources); + " resourcelimit=" + availableResources + + "knownNMs=" + clusterNmCount); ask.clear(); release.clear(); return response; } + // May be incorrect if there's multiple NodeManagers running on a single host. + // knownNodeCount is based on node managers, not hosts. blacklisting is + // currently based on hosts. + protected void computeIgnoreBlacklisting() { + if (blacklistDisablePercent != -1 + && (blacklistedNodeCount != blacklistedNodes.size() || + clusterNmCount != lastClusterNmCount)) { + blacklistedNodeCount = blacklistedNodes.size(); + if (clusterNmCount == 0) { + LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting"); + return; + } + int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100); + if (val >= blacklistDisablePercent) { + if (ignoreBlacklisting.compareAndSet(false, true)) { + LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount + + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%"); + } + } else { + if (ignoreBlacklisting.compareAndSet(true, false)) { + LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount + + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%"); + } + } + } + } + protected void containerFailedOnHost(String hostName) { if (!nodeBlacklistingEnabled) { return; @@ -161,8 +209,10 @@ public abstract class RMContainerRequestor extends RMCommunicator { LOG.info(failures + " failures on node " + hostName); if (failures >= maxTaskFailuresPerNode) { blacklistedNodes.add(hostName); + //Even if blacklisting is ignored, continue to remove the host from + // the request table. The RM may have additional nodes it can allocate on. LOG.info("Blacklisted host " + hostName); - + //remove all the requests corresponding to this hostname for (Map> remoteRequests : remoteRequestsTable.values()){ @@ -316,7 +366,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { } protected boolean isNodeBlacklisted(String hostname) { - if (!nodeBlacklistingEnabled) { + if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) { return false; } return blacklistedNodes.contains(hostname); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index c9436e5645a..785d8a7d03f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -488,6 +488,8 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); + conf.setInt( + MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); MyResourceManager rm = new MyResourceManager(conf); rm.start(); @@ -580,6 +582,175 @@ public class TestRMContainerAllocator { } } + @Test + public void testIgnoreBlacklisting() throws Exception { + LOG.info("Running testIgnoreBlacklisting"); + + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); + conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); + conf.setInt( + MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33); + + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM[] nodeManagers = new MockNM[10]; + int nmNum = 0; + List assigned = null; + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[0].nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = + app.getCurrentAppAttempt().getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false)); + MyContainerAllocator allocator = + new MyContainerAllocator(rm, conf, appAttemptId, mockJob); + + // Known=1, blacklisted=0, ignore should be false - assign first container + assigned = + getContainerOnHost(jobId, 1, 1024, new String[] { "h1" }, + nodeManagers[0], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + LOG.info("Failing container _1 on H1 (Node should be blacklisted and" + + " ignore blacklisting enabled"); + // Send events to blacklist nodes h1 and h2 + ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); + allocator.sendFailure(f1); + + // Test single node. + // Known=1, blacklisted=1, ignore should be true - assign 1 + assigned = + getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, + nodeManagers[0], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + // Known=2, blacklisted=1, ignore should be true - assign 1 anyway. + assigned = + getContainerOnHost(jobId, 3, 1024, new String[] { "h2" }, + nodeManagers[1], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. + assigned = + getContainerOnHost(jobId, 4, 1024, new String[] { "h3" }, + nodeManagers[2], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + // Known=3, blacklisted=1, ignore should be true - assign 1 + assigned = + getContainerOnHost(jobId, 5, 1024, new String[] { "h1" }, + nodeManagers[0], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + // Known=4, blacklisted=1, ignore should be false - assign 1 anyway + assigned = + getContainerOnHost(jobId, 6, 1024, new String[] { "h4" }, + nodeManagers[3], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + // Test blacklisting re-enabled. + // Known=4, blacklisted=1, ignore should be false - no assignment on h1 + assigned = + getContainerOnHost(jobId, 7, 1024, new String[] { "h1" }, + nodeManagers[0], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + // RMContainerRequestor would have created a replacement request. + + // Blacklist h2 + ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false); + allocator.sendFailure(f2); + + // Test ignore blacklisting re-enabled + // Known=4, blacklisted=2, ignore should be true. Should assign 2 + // containers. + assigned = + getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, + nodeManagers[0], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); + + // Known=4, blacklisted=2, ignore should be true. + assigned = + getContainerOnHost(jobId, 9, 1024, new String[] { "h2" }, + nodeManagers[1], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + // Test blacklist while ignore blacklisting enabled + ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false); + allocator.sendFailure(f3); + + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + // Known=5, blacklisted=3, ignore should be true. + assigned = + getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, + nodeManagers[2], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + + // Assign on 5 more nodes - to re-enable blacklisting + for (int i = 0; i < 5; i++) { + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + assigned = + getContainerOnHost(jobId, 11 + i, 1024, + new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i], + dispatcher, allocator); + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + } + + // Test h3 (blacklisted while ignoring blacklisting) is blacklisted. + assigned = + getContainerOnHost(jobId, 20, 1024, new String[] { "h3" }, + nodeManagers[2], dispatcher, allocator); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + } + + private MockNM registerNodeManager(int i, MyResourceManager rm, + DrainDispatcher dispatcher) throws Exception { + MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240); + dispatcher.await(); + return nm; + } + + private + List getContainerOnHost(JobId jobId, + int taskAttemptId, int memory, String[] hosts, MockNM mockNM, + DrainDispatcher dispatcher, MyContainerAllocator allocator) + throws Exception { + ContainerRequestEvent reqEvent = + createReq(jobId, taskAttemptId, memory, hosts); + allocator.sendRequest(reqEvent); + + // Send the request to the RM + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // Heartbeat from the required nodeManager + mockNM.nodeHeartbeat(true); + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + return assigned; + } + @Test public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("Running testBlackListedNodesWithSchedulingToThatNode"); @@ -587,6 +758,8 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); + conf.setInt( + MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); MyResourceManager rm = new MyResourceManager(conf); rm.start(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 621787981d1..9dfa8f633ef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -348,8 +348,14 @@ public interface MRJobConfig { /** Enable blacklisting of nodes in the job.*/ public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE = - MR_AM_PREFIX + "job.node.blacklisting.enable"; + MR_AM_PREFIX + "job.node-blacklisting.enable"; + /** Ignore blacklisting if a certain percentage of nodes have been blacklisted */ + public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT = + MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent"; + public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT = + 33; + /** Enable job recovery.*/ public static final String MR_AM_JOB_RECOVERY_ENABLE = MR_AM_PREFIX + "job.recovery.enable"; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 267a252918c..a9ef8994201 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -61,4 +61,17 @@ public interface AllocateResponse { @Private @Unstable public abstract void setAMResponse(AMResponse amResponse); + + + /** + * Get the number of hosts available on the cluster. + * @return the available host count. + */ + @Public + @Stable + public int getNumClusterNodes(); + + @Private + @Unstable + public void setNumClusterNodes(int numNodes); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 80946c1993d..971f23a7717 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBui -public class AllocateResponsePBImpl extends ProtoBase implements AllocateResponse { +public class AllocateResponsePBImpl extends ProtoBase + implements AllocateResponse { AllocateResponseProto proto = AllocateResponseProto.getDefaultInstance(); AllocateResponseProto.Builder builder = null; boolean viaProto = false; @@ -95,7 +96,20 @@ public class AllocateResponsePBImpl extends ProtoBase imp builder.clearAMResponse(); this.amResponse = aMResponse; } + + @Override + public int getNumClusterNodes() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + return p.getNumClusterNodes(); + } + + @Override + public void setNumClusterNodes(int numNodes) { + maybeInitBuilder(); + builder.setNumClusterNodes(numNodes); + } + private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) { return new AMResponsePBImpl(p); } @@ -103,7 +117,4 @@ public class AllocateResponsePBImpl extends ProtoBase imp private AMResponseProto convertToProtoFormat(AMResponse t) { return ((AMResponsePBImpl)t).getProto(); } - - - } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 99ca8b782d5..5444185b7ee 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -59,6 +59,7 @@ message AllocateRequestProto { message AllocateResponseProto { optional AMResponseProto AM_response = 1; + optional int32 num_cluster_nodes = 2; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 175542cc2a7..2de40440246 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -285,6 +285,7 @@ public class ApplicationMasterService extends AbstractService implements response.setAvailableResources(allocation.getResourceLimit()); responseMap.put(appAttemptId, response); allocateResponse.setAMResponse(response); + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); return allocateResponse; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 7f7e994e7b4..f0846497235 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -79,6 +79,14 @@ public interface YarnScheduler extends EventHandler { @Stable public Resource getMaximumResourceCapability(); + /** + * Get the number of nodes available in the cluster. + * @return the number of available nodes. + */ + @Public + @Stable + public int getNumClusterNodes(); + /** * The main api between the ApplicationMaster and the Scheduler. * The ApplicationMaster is updating his future resource requirements diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5f080f83645..8aa85c3ab11 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -159,6 +159,7 @@ implements ResourceScheduler, CapacitySchedulerContext { return maximumAllocation; } + @Override public synchronized int getNumClusterNodes() { return numNodeManagers; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index c90566bd5b5..8e274956b57 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -36,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.Lock; @@ -179,6 +177,11 @@ public class FifoScheduler implements ResourceScheduler { return minimumAllocation; } + @Override + public int getNumClusterNodes() { + return nodes.size(); + } + @Override public Resource getMaximumResourceCapability() { return maximumAllocation;