From 7640d62716b35f4cb23df61381af3ad9b997c09e Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Tue, 31 Jul 2018 18:01:02 -0400 Subject: [PATCH] YARN-8579. Recover NMToken of previous attempted component data. Contributed by Gour Saha --- .../hadoop/yarn/service/ServiceScheduler.java | 1 + .../scheduler/SchedulerApplicationAttempt.java | 3 ++- .../scheduler/fair/FairScheduler.java | 9 +++++++-- .../applicationsmanager/TestAMRestart.java | 18 ++++++++++++++---- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index cfaf3567b61..0801ad052db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -649,6 +649,7 @@ public class ServiceScheduler extends CompositeService { @Override public void onContainersReceivedFromPreviousAttempts( List containers) { + LOG.info("Containers recovered after AM registered: {}", containers); if (containers == null || containers.isEmpty()) { return; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 005569cf252..767df6d0ad4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -785,6 +785,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { List returnContainerList = new ArrayList<> (recoveredPreviousAttemptContainers); recoveredPreviousAttemptContainers.clear(); + updateNMTokens(returnContainerList); return returnContainerList; } finally { writeLock.unlock(); @@ -1471,4 +1472,4 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public Map getApplicationSchedulingEnvs() { return this.applicationSchedulingEnvs; } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 13874bfae2b..caa6aa07567 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -912,12 +913,16 @@ public class FairScheduler extends Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); + + List previousAttemptContainers = application + .pullPreviousAttemptContainers(); + List updatedNMTokens = application.pullUpdatedNMTokens(); return new Allocation(newlyAllocatedContainers, headroom, preemptionContainerIds, null, null, - application.pullUpdatedNMTokens(), null, null, + updatedNMTokens, null, null, application.pullNewlyPromotedContainers(), application.pullNewlyDemotedContainers(), - application.pullPreviousAttemptContainers()); + previousAttemptContainers); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 4add1862ce2..9f122cb34bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -1048,12 +1048,12 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { rm1.start(); YarnScheduler scheduler = rm1.getResourceScheduler(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, - rm1.getResourceTrackerService()); + String nm1Address = "127.0.0.1:1234"; + MockNM nm1 = new MockNM(nm1Address, 10240, rm1.getResourceTrackerService()); nm1.registerNode(); - MockNM nm2 = new MockNM("127.0.0.1:2351", 4089, - rm1.getResourceTrackerService()); + String nm2Address = "127.0.0.1:2351"; + MockNM nm2 = new MockNM(nm2Address, 4089, rm1.getResourceTrackerService()); nm2.registerNode(); RMApp app1 = rm1.submitApp(200, "name", "user", @@ -1120,6 +1120,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { registerResponse.getContainersFromPreviousAttempts().size()); Assert.assertEquals("container 2", containerId2, registerResponse.getContainersFromPreviousAttempts().get(0).getId()); + List prevNMTokens = registerResponse + .getNMTokensFromPreviousAttempts(); + Assert.assertEquals(1, prevNMTokens.size()); + // container 2 is running on node 1 + Assert.assertEquals(nm1Address, prevNMTokens.get(0).getNodeId().toString()); rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); @@ -1145,6 +1150,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase { allocateResponse.getContainersFromPreviousAttempts()); Assert.assertEquals("new containers should not be allocated", 0, allocateResponse.getAllocatedContainers().size()); + List nmTokens = allocateResponse.getNMTokens(); + Assert.assertEquals(1, nmTokens.size()); + // container 3 is running on node 2 + Assert.assertEquals(nm2Address, + nmTokens.get(0).getNodeId().toString()); return true; } } catch (Exception e) {