YARN-7958. ServiceMaster should only wait for recovery of containers with id that match the current application id. Contributed by Chandni Singh
This commit is contained in:
parent
55d04a6db1
commit
5ed689e33a
|
@ -40,6 +40,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
@ -360,17 +361,22 @@ public class ServiceScheduler extends CompositeService {
|
||||||
amRMClient.releaseAssignedContainer(container.getId());
|
amRMClient.releaseAssignedContainer(container.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ApplicationId appId = ApplicationId.fromString(app.getId());
|
||||||
existingRecords.forEach((encodedContainerId, record) -> {
|
existingRecords.forEach((encodedContainerId, record) -> {
|
||||||
String componentName = record.get(YarnRegistryAttributes.YARN_COMPONENT);
|
String componentName = record.get(YarnRegistryAttributes.YARN_COMPONENT);
|
||||||
if (componentName != null) {
|
if (componentName != null) {
|
||||||
Component component = componentsByName.get(componentName);
|
Component component = componentsByName.get(componentName);
|
||||||
ComponentInstance compInstance = component.getComponentInstance(
|
if (component != null) {
|
||||||
record.description);
|
ComponentInstance compInstance = component.getComponentInstance(
|
||||||
ContainerId containerId = ContainerId.fromString(record.get(
|
record.description);
|
||||||
YarnRegistryAttributes.YARN_ID));
|
ContainerId containerId = ContainerId.fromString(record.get(
|
||||||
unRecoveredInstances.put(containerId, compInstance);
|
YarnRegistryAttributes.YARN_ID));
|
||||||
component.removePendingInstance(compInstance);
|
if (containerId.getApplicationAttemptId().getApplicationId()
|
||||||
|
.equals(appId)) {
|
||||||
|
unRecoveredInstances.put(containerId, compInstance);
|
||||||
|
component.removePendingInstance(compInstance);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -210,6 +210,49 @@ public class TestServiceAM extends ServiceTestUtils{
|
||||||
.getState());
|
.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test to verify that the AM doesn't wait for containers of a different app
|
||||||
|
// even though it corresponds to the same service.
|
||||||
|
@Test(timeout = 200000)
|
||||||
|
public void testContainersFromDifferentApp()
|
||||||
|
throws Exception {
|
||||||
|
ApplicationId applicationId = ApplicationId.newInstance(
|
||||||
|
System.currentTimeMillis(), 1);
|
||||||
|
Service exampleApp = new Service();
|
||||||
|
exampleApp.setId(applicationId.toString());
|
||||||
|
exampleApp.setName("testContainersFromDifferentApp");
|
||||||
|
String comp1Name = "comp1";
|
||||||
|
String comp1InstName = "comp1-0";
|
||||||
|
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component compA =
|
||||||
|
createComponent(comp1Name, 1, "sleep");
|
||||||
|
exampleApp.addComponent(compA);
|
||||||
|
|
||||||
|
MockServiceAM am = new MockServiceAM(exampleApp);
|
||||||
|
ContainerId containerId = am.createContainerId(1);
|
||||||
|
// saves the container in the registry
|
||||||
|
am.feedRegistryComponent(containerId, comp1Name, comp1InstName);
|
||||||
|
|
||||||
|
ApplicationId changedAppId = ApplicationId.newInstance(
|
||||||
|
System.currentTimeMillis(), 2);
|
||||||
|
exampleApp.setId(changedAppId.toString());
|
||||||
|
am.init(conf);
|
||||||
|
am.start();
|
||||||
|
// 1 pending instance since the container in registry belongs to a different
|
||||||
|
// app.
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
am.getComponent(comp1Name).getPendingInstances().size());
|
||||||
|
|
||||||
|
am.feedContainerToComp(exampleApp, 1, comp1Name);
|
||||||
|
GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName)
|
||||||
|
.getContainerStatus() != null, 2000, 200000);
|
||||||
|
|
||||||
|
Assert.assertEquals("container state",
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||||
|
am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
|
||||||
|
.getState());
|
||||||
|
am.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScheduleWithMultipleResourceTypes()
|
public void testScheduleWithMultipleResourceTypes()
|
||||||
throws TimeoutException, InterruptedException, IOException {
|
throws TimeoutException, InterruptedException, IOException {
|
||||||
|
|
Loading…
Reference in New Issue