From c7ebcd76bf3dd14127336951f2be3de772e7826a Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Tue, 31 Jul 2018 18:01:02 -0400 Subject: [PATCH] YARN-8579. Recover NMToken of previous attempted component data. Contributed by Gour Saha --- .../hadoop/yarn/service/ServiceScheduler.java | 1 + .../scheduler/SchedulerApplicationAttempt.java | 3 ++- .../scheduler/fair/FairScheduler.java | 8 ++++++-- .../applicationsmanager/TestAMRestart.java | 18 ++++++++++++++---- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index cfaf3567b61..0801ad052db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -649,6 +649,7 @@ public class ServiceScheduler extends CompositeService { @Override public void onContainersReceivedFromPreviousAttempts( List containers) { + LOG.info("Containers recovered after AM registered: {}", containers); if (containers == null || containers.isEmpty()) { return; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index dd6d38f6025..f9df2b829b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -785,6 +785,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { List returnContainerList = new ArrayList<> (recoveredPreviousAttemptContainers); recoveredPreviousAttemptContainers.clear(); + updateNMTokens(returnContainerList); return returnContainerList; } finally { writeLock.unlock(); @@ -1466,4 +1467,4 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public Map getApplicationSchedulingEnvs() { return this.applicationSchedulingEnvs; } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 20d1afe0b6b..037cebf1734 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -950,12 +951,15 @@ public class FairScheduler extends Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); + List previousAttemptContainers = application + .pullPreviousAttemptContainers(); + List updatedNMTokens = application.pullUpdatedNMTokens(); return new Allocation(newlyAllocatedContainers, headroom, preemptionContainerIds, null, null, - application.pullUpdatedNMTokens(), null, null, + updatedNMTokens, null, null, application.pullNewlyPromotedContainers(), application.pullNewlyDemotedContainers(), - application.pullPreviousAttemptContainers()); + previousAttemptContainers); } private List validateResourceRequests( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 4add1862ce2..9f122cb34bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -1048,12 +1048,12 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { rm1.start(); YarnScheduler scheduler = rm1.getResourceScheduler(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, - rm1.getResourceTrackerService()); + String nm1Address = "127.0.0.1:1234"; + MockNM nm1 = new MockNM(nm1Address, 10240, rm1.getResourceTrackerService()); nm1.registerNode(); - MockNM nm2 = new MockNM("127.0.0.1:2351", 4089, - rm1.getResourceTrackerService()); + String nm2Address = "127.0.0.1:2351"; + MockNM nm2 = new MockNM(nm2Address, 4089, rm1.getResourceTrackerService()); nm2.registerNode(); RMApp app1 = rm1.submitApp(200, "name", "user", @@ -1120,6 +1120,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { registerResponse.getContainersFromPreviousAttempts().size()); Assert.assertEquals("container 2", containerId2, registerResponse.getContainersFromPreviousAttempts().get(0).getId()); + List prevNMTokens = registerResponse + .getNMTokensFromPreviousAttempts(); + Assert.assertEquals(1, prevNMTokens.size()); + // container 2 is running on node 1 + Assert.assertEquals(nm1Address, prevNMTokens.get(0).getNodeId().toString()); rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); @@ -1145,6 +1150,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { allocateResponse.getContainersFromPreviousAttempts()); Assert.assertEquals("new containers should not be allocated", 0, allocateResponse.getAllocatedContainers().size()); + List nmTokens = allocateResponse.getNMTokens(); + Assert.assertEquals(1, nmTokens.size()); + // container 3 is running on node 2 + Assert.assertEquals(nm2Address, + nmTokens.get(0).getNodeId().toString()); return true; } } catch (Exception e) {