YARN-10538: Add RECOMMISSIONING nodes to the list of updated nodes returned to the AM (#2564)

Contributed by Srinivas S T
This commit is contained in:
srinivasst 2021-01-08 10:52:52 +05:30 committed by GitHub
parent b612c310c2
commit 1b1791075a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 0 deletions

View File

@ -1325,6 +1325,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.handle( .handle(
new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
.newInstance(rmNode.totalCapability, 0))); .newInstance(rmNode.totalCapability, 0)));
// Notify NodesListManager to notify all RMApp that this node has been
// recommissioned so that each Application Master can take any required
// actions.
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
} }
} }

View File

@ -22,6 +22,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -91,6 +92,12 @@ public class TestAMRMRPCNodeUpdates {
rm.drainEvents(); rm.drainEvents();
} }
private void syncNodeRecommissioning(MockNM nm) throws Exception {
rm.sendNodeEvent(nm, RMNodeEventType.RECOMMISSION);
rm.waitForState(nm.getNodeId(), NodeState.RUNNING);
rm.drainEvents();
}
private AllocateResponse allocate(final ApplicationAttemptId attemptId, private AllocateResponse allocate(final ApplicationAttemptId attemptId,
final AllocateRequest req) throws Exception { final AllocateRequest req) throws Exception {
UserGroupInformation ugi = UserGroupInformation ugi =
@ -140,6 +147,53 @@ public class TestAMRMRPCNodeUpdates {
NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType()); NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
} }
@Test
public void testAMRMRecommissioningNodes() throws Exception {
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
rm.drainEvents();
RMApp app1 = MockRMAppSubmitter.submitWithMemory(2000, rm);
// Trigger the scheduling so the AM gets 'launched' on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
// register AM returns no unusable node
am1.registerAppAttempt();
// DECOMMISSION nm2
Integer decommissioningTimeout = 600;
syncNodeGracefulDecommission(nm2, decommissioningTimeout);
AllocateRequest allocateRequest1 =
AllocateRequest.newInstance(0, 0F, null, null, null);
AllocateResponse response1 =
allocate(attempt1.getAppAttemptId(), allocateRequest1);
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next();
Assert.assertEquals(
decommissioningTimeout, nr.getDecommissioningTimeout());
Assert.assertEquals(
NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
// Wait for nm2 to RECOMMISSION
syncNodeRecommissioning(nm2);
AllocateRequest allocateRequest2 = AllocateRequest
.newInstance(response1.getResponseId(), 0F, null, null, null);
AllocateResponse response2 =
allocate(attempt1.getAppAttemptId(), allocateRequest2);
List<NodeReport> updatedNodes2 = response2.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes2.size());
NodeReport nr2 = updatedNodes2.iterator().next();
Assert.assertEquals(
NodeUpdateType.NODE_USABLE, nr2.getNodeUpdateType());
}
@Test @Test
public void testAMRMUnusableNodes() throws Exception { public void testAMRMUnusableNodes() throws Exception {