YARN-8824. App Nodelabel missed after RM restart for finished apps. Contributed by Bibin A Chundatt.
This commit is contained in:
parent
7dbdb7560d
commit
d473152e6a
|
@ -306,6 +306,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
context.setApplicationTags(srcCtx.getApplicationTags());
|
context.setApplicationTags(srcCtx.getApplicationTags());
|
||||||
context.setApplicationType(srcCtx.getApplicationType());
|
context.setApplicationType(srcCtx.getApplicationType());
|
||||||
context.setUnmanagedAM(srcCtx.getUnmanagedAM());
|
context.setUnmanagedAM(srcCtx.getUnmanagedAM());
|
||||||
|
context.setNodeLabelExpression(srcCtx.getNodeLabelExpression());
|
||||||
ContainerLaunchContextPBImpl amContainerSpec =
|
ContainerLaunchContextPBImpl amContainerSpec =
|
||||||
new ContainerLaunchContextPBImpl();
|
new ContainerLaunchContextPBImpl();
|
||||||
amContainerSpec.setApplicationACLs(
|
amContainerSpec.setApplicationACLs(
|
||||||
|
|
|
@ -105,6 +105,7 @@ public class MockMemoryRMStateStore extends MemoryRMStateStore {
|
||||||
oldAppSubCtxt.getAMContainerResourceRequests());
|
oldAppSubCtxt.getAMContainerResourceRequests());
|
||||||
context.setLogAggregationContext(oldAppSubCtxt.getLogAggregationContext());
|
context.setLogAggregationContext(oldAppSubCtxt.getLogAggregationContext());
|
||||||
context.setApplicationType(oldAppSubCtxt.getApplicationType());
|
context.setApplicationType(oldAppSubCtxt.getApplicationType());
|
||||||
|
context.setNodeLabelExpression(oldAppSubCtxt.getNodeLabelExpression());
|
||||||
this.appSubCtxtCopy.put(oldAppSubCtxt.getApplicationId(), context);
|
this.appSubCtxtCopy.put(oldAppSubCtxt.getApplicationId(), context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -700,13 +700,23 @@ public class MockRM extends ResourceManager {
|
||||||
|
|
||||||
public RMApp submitApp(List<ResourceRequest> amResourceRequests)
|
public RMApp submitApp(List<ResourceRequest> amResourceRequests)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
return submitApp(amResourceRequests, "app1",
|
return submitApp(amResourceRequests, "app1", "user", null, false, null,
|
||||||
"user", null, false, null,
|
|
||||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
||||||
false, false, null, 0, null, true,
|
false, false, null, 0, null, true,
|
||||||
amResourceRequests.get(0).getPriority(),
|
amResourceRequests.get(0).getPriority(),
|
||||||
amResourceRequests.get(0).getNodeLabelExpression(), null, null);
|
amResourceRequests.get(0).getNodeLabelExpression(), null, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMApp submitApp(List<ResourceRequest> amResourceRequests,
|
||||||
|
String appNodeLabel) throws Exception {
|
||||||
|
return submitApp(amResourceRequests, "app1", "user", null, false, null,
|
||||||
|
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
||||||
|
false, false, null, 0, null, true,
|
||||||
|
amResourceRequests.get(0).getPriority(),
|
||||||
|
amResourceRequests.get(0).getNodeLabelExpression(), null, null,
|
||||||
|
appNodeLabel);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMApp submitApp(Resource capability, String name, String user,
|
public RMApp submitApp(Resource capability, String name, String user,
|
||||||
|
@ -730,7 +740,7 @@ public class MockRM extends ResourceManager {
|
||||||
keepContainers, isAppIdProvided, applicationId,
|
keepContainers, isAppIdProvided, applicationId,
|
||||||
attemptFailuresValidityInterval, logAggregationContext,
|
attemptFailuresValidityInterval, logAggregationContext,
|
||||||
cancelTokensWhenComplete, priority, amLabel, applicationTimeouts,
|
cancelTokensWhenComplete, priority, amLabel, applicationTimeouts,
|
||||||
tokensConf);
|
tokensConf, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
|
public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
|
||||||
|
@ -741,8 +751,7 @@ public class MockRM extends ResourceManager {
|
||||||
LogAggregationContext logAggregationContext,
|
LogAggregationContext logAggregationContext,
|
||||||
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
||||||
Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
||||||
ByteBuffer tokensConf)
|
ByteBuffer tokensConf, String appNodeLabel) throws Exception {
|
||||||
throws Exception {
|
|
||||||
ApplicationId appId = isAppIdProvided ? applicationId : null;
|
ApplicationId appId = isAppIdProvided ? applicationId : null;
|
||||||
ApplicationClientProtocol client = getClientRMService();
|
ApplicationClientProtocol client = getClientRMService();
|
||||||
if (! isAppIdProvided) {
|
if (! isAppIdProvided) {
|
||||||
|
@ -770,6 +779,9 @@ public class MockRM extends ResourceManager {
|
||||||
if (priority != null) {
|
if (priority != null) {
|
||||||
sub.setPriority(priority);
|
sub.setPriority(priority);
|
||||||
}
|
}
|
||||||
|
if (appNodeLabel != null) {
|
||||||
|
sub.setNodeLabelExpression(appNodeLabel);
|
||||||
|
}
|
||||||
sub.setApplicationType(appType);
|
sub.setApplicationType(appType);
|
||||||
ContainerLaunchContext clc = Records
|
ContainerLaunchContext clc = Records
|
||||||
.newRecord(ContainerLaunchContext.class);
|
.newRecord(ContainerLaunchContext.class);
|
||||||
|
|
|
@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
@ -88,6 +89,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
@ -456,6 +458,49 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
Assert.assertEquals(4, rmAppState.size());
|
Assert.assertEquals(4, rmAppState.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAppReportNodeLabelRMRestart() throws Exception {
|
||||||
|
if (getSchedulerType() != SchedulerType.CAPACITY) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Create RM
|
||||||
|
YarnConfiguration newConf = new YarnConfiguration(conf);
|
||||||
|
newConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||||
|
MockRM rm1 = createMockRM(newConf);
|
||||||
|
NodeLabel amLabel = NodeLabel.newInstance("AMLABEL");
|
||||||
|
NodeLabel appLabel = NodeLabel.newInstance("APPLABEL");
|
||||||
|
List<NodeLabel> labels = new ArrayList<>();
|
||||||
|
labels.add(amLabel);
|
||||||
|
labels.add(appLabel);
|
||||||
|
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||||
|
rm1.start();
|
||||||
|
// Add label
|
||||||
|
rm1.getAdminService().addToClusterNodeLabels(
|
||||||
|
AddToClusterNodeLabelsRequest.newInstance(labels));
|
||||||
|
// create app and launch the AM
|
||||||
|
ResourceRequest amResourceRequest = ResourceRequest
|
||||||
|
.newInstance(Priority.newInstance(0), ResourceRequest.ANY,
|
||||||
|
Resource.newInstance(200, 1), 1, true, amLabel.getName());
|
||||||
|
ArrayList resReqs = new ArrayList<>();
|
||||||
|
resReqs.add(amResourceRequest);
|
||||||
|
RMApp app0 = rm1.submitApp(resReqs, appLabel.getName());
|
||||||
|
rm1.killApp(app0.getApplicationId());
|
||||||
|
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
||||||
|
// start new RM
|
||||||
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
||||||
|
ApplicationReport appReport = rm2.getClientRMService().getApplicationReport(
|
||||||
|
GetApplicationReportRequest.newInstance(app0.getApplicationId()))
|
||||||
|
.getApplicationReport();
|
||||||
|
Assert
|
||||||
|
.assertEquals(amLabel.getName(), appReport.getAmNodeLabelExpression());
|
||||||
|
Assert.assertEquals(appLabel.getName(),
|
||||||
|
appReport.getAppNodeLabelExpression());
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testUnManagedRMRestart() throws Exception {
|
public void testUnManagedRMRestart() throws Exception {
|
||||||
// Create RM
|
// Create RM
|
||||||
|
@ -471,6 +516,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
MockRM rm2 = createMockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
||||||
|
ApplicationReport appReport = rm2.getClientRMService().getApplicationReport(
|
||||||
|
GetApplicationReportRequest.newInstance(app0.getApplicationId()))
|
||||||
|
.getApplicationReport();
|
||||||
|
Assert.assertEquals(true, appReport.isUnmanagedApp());
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue