YARN-8579. Recover NMToken of previous attempted component data.

Contributed by Gour Saha
This commit is contained in:
Eric Yang 2018-07-31 18:01:02 -04:00
parent 4b540bbfcf
commit c7ebcd76bf
4 changed files with 23 additions and 7 deletions

View File

@ -649,6 +649,7 @@ public class ServiceScheduler extends CompositeService {
@Override @Override
public void onContainersReceivedFromPreviousAttempts( public void onContainersReceivedFromPreviousAttempts(
List<Container> containers) { List<Container> containers) {
LOG.info("Containers recovered after AM registered: {}", containers);
if (containers == null || containers.isEmpty()) { if (containers == null || containers.isEmpty()) {
return; return;
} }

View File

@ -785,6 +785,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
List<Container> returnContainerList = new ArrayList<> List<Container> returnContainerList = new ArrayList<>
(recoveredPreviousAttemptContainers); (recoveredPreviousAttemptContainers);
recoveredPreviousAttemptContainers.clear(); recoveredPreviousAttemptContainers.clear();
updateNMTokens(returnContainerList);
return returnContainerList; return returnContainerList;
} finally { } finally {
writeLock.unlock(); writeLock.unlock();

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -950,12 +951,15 @@ public class FairScheduler extends
Resource headroom = application.getHeadroom(); Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom); application.setApplicationHeadroomForMetrics(headroom);
List<Container> previousAttemptContainers = application
.pullPreviousAttemptContainers();
List<NMToken> updatedNMTokens = application.pullUpdatedNMTokens();
return new Allocation(newlyAllocatedContainers, headroom, return new Allocation(newlyAllocatedContainers, headroom,
preemptionContainerIds, null, null, preemptionContainerIds, null, null,
application.pullUpdatedNMTokens(), null, null, updatedNMTokens, null, null,
application.pullNewlyPromotedContainers(), application.pullNewlyPromotedContainers(),
application.pullNewlyDemotedContainers(), application.pullNewlyDemotedContainers(),
application.pullPreviousAttemptContainers()); previousAttemptContainers);
} }
private List<MaxResourceValidationResult> validateResourceRequests( private List<MaxResourceValidationResult> validateResourceRequests(

View File

@ -1048,12 +1048,12 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
rm1.start(); rm1.start();
YarnScheduler scheduler = rm1.getResourceScheduler(); YarnScheduler scheduler = rm1.getResourceScheduler();
MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, String nm1Address = "127.0.0.1:1234";
rm1.getResourceTrackerService()); MockNM nm1 = new MockNM(nm1Address, 10240, rm1.getResourceTrackerService());
nm1.registerNode(); nm1.registerNode();
MockNM nm2 = new MockNM("127.0.0.1:2351", 4089, String nm2Address = "127.0.0.1:2351";
rm1.getResourceTrackerService()); MockNM nm2 = new MockNM(nm2Address, 4089, rm1.getResourceTrackerService());
nm2.registerNode(); nm2.registerNode();
RMApp app1 = rm1.submitApp(200, "name", "user", RMApp app1 = rm1.submitApp(200, "name", "user",
@ -1120,6 +1120,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
registerResponse.getContainersFromPreviousAttempts().size()); registerResponse.getContainersFromPreviousAttempts().size());
Assert.assertEquals("container 2", containerId2, Assert.assertEquals("container 2", containerId2,
registerResponse.getContainersFromPreviousAttempts().get(0).getId()); registerResponse.getContainersFromPreviousAttempts().get(0).getId());
List<NMToken> prevNMTokens = registerResponse
.getNMTokensFromPreviousAttempts();
Assert.assertEquals(1, prevNMTokens.size());
// container 2 is running on node 1
Assert.assertEquals(nm1Address, prevNMTokens.get(0).getNodeId().toString());
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
@ -1145,6 +1150,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
allocateResponse.getContainersFromPreviousAttempts()); allocateResponse.getContainersFromPreviousAttempts());
Assert.assertEquals("new containers should not be allocated", Assert.assertEquals("new containers should not be allocated",
0, allocateResponse.getAllocatedContainers().size()); 0, allocateResponse.getAllocatedContainers().size());
List<NMToken> nmTokens = allocateResponse.getNMTokens();
Assert.assertEquals(1, nmTokens.size());
// container 3 is running on node 2
Assert.assertEquals(nm2Address,
nmTokens.get(0).getNodeId().toString());
return true; return true;
} }
} catch (Exception e) { } catch (Exception e) {