YARN-8824. App Nodelabel missed after RM restart for finished apps. Contributed by Bibin A Chundatt.
This commit is contained in:
parent
3f6195045e
commit
e5287a4fe0
|
@ -307,6 +307,7 @@ public abstract class RMStateStore extends AbstractService {
|
|||
context.setApplicationTags(srcCtx.getApplicationTags());
|
||||
context.setApplicationType(srcCtx.getApplicationType());
|
||||
context.setUnmanagedAM(srcCtx.getUnmanagedAM());
|
||||
context.setNodeLabelExpression(srcCtx.getNodeLabelExpression());
|
||||
ContainerLaunchContextPBImpl amContainerSpec =
|
||||
new ContainerLaunchContextPBImpl();
|
||||
amContainerSpec.setApplicationACLs(
|
||||
|
|
|
@ -105,6 +105,7 @@ public class MockMemoryRMStateStore extends MemoryRMStateStore {
|
|||
oldAppSubCtxt.getAMContainerResourceRequests());
|
||||
context.setLogAggregationContext(oldAppSubCtxt.getLogAggregationContext());
|
||||
context.setApplicationType(oldAppSubCtxt.getApplicationType());
|
||||
context.setNodeLabelExpression(oldAppSubCtxt.getNodeLabelExpression());
|
||||
this.appSubCtxtCopy.put(oldAppSubCtxt.getApplicationId(), context);
|
||||
}
|
||||
|
||||
|
|
|
@ -722,6 +722,17 @@ public class MockRM extends ResourceManager {
|
|||
amResourceRequests.get(0).getNodeLabelExpression(), null, null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(List<ResourceRequest> amResourceRequests,
|
||||
String appNodeLabel) throws Exception {
|
||||
return submitApp(amResourceRequests, "app1", "user", null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
||||
false, false, null, 0, null, true,
|
||||
amResourceRequests.get(0).getPriority(),
|
||||
amResourceRequests.get(0).getNodeLabelExpression(), null, null, null,
|
||||
appNodeLabel);
|
||||
}
|
||||
|
||||
public RMApp submitApp(Resource capability, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
int maxAppAttempts, Credentials ts, String appType,
|
||||
|
@ -771,6 +782,23 @@ public class MockRM extends ResourceManager {
|
|||
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
||||
Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
||||
ByteBuffer tokensConf, Set<String> applicationTags) throws Exception {
|
||||
return submitApp(amResourceRequests, name, user, acls, unmanaged, queue,
|
||||
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
||||
isAppIdProvided, applicationId, attemptFailuresValidityInterval,
|
||||
logAggregationContext, cancelTokensWhenComplete, priority, amLabel,
|
||||
applicationTimeouts, tokensConf, applicationTags, null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
|
||||
String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
|
||||
String queue, int maxAppAttempts, Credentials ts, String appType,
|
||||
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
||||
ApplicationId applicationId, long attemptFailuresValidityInterval,
|
||||
LogAggregationContext logAggregationContext,
|
||||
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
||||
Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
||||
ByteBuffer tokensConf, Set<String> applicationTags, String appNodeLabel)
|
||||
throws Exception {
|
||||
ApplicationId appId = isAppIdProvided ? applicationId : null;
|
||||
ApplicationClientProtocol client = getClientRMService();
|
||||
if (! isAppIdProvided) {
|
||||
|
@ -801,6 +829,9 @@ public class MockRM extends ResourceManager {
|
|||
if (priority != null) {
|
||||
sub.setPriority(priority);
|
||||
}
|
||||
if (appNodeLabel != null) {
|
||||
sub.setNodeLabelExpression(appNodeLabel);
|
||||
}
|
||||
sub.setApplicationType(appType);
|
||||
ContainerLaunchContext clc = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
|
|
|
@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
|||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -88,6 +89,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
|
@ -456,6 +458,49 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
Assert.assertEquals(4, rmAppState.size());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testAppReportNodeLabelRMRestart() throws Exception {
|
||||
if (getSchedulerType() != SchedulerType.CAPACITY) {
|
||||
return;
|
||||
}
|
||||
// Create RM
|
||||
YarnConfiguration newConf = new YarnConfiguration(conf);
|
||||
newConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
MockRM rm1 = createMockRM(newConf);
|
||||
NodeLabel amLabel = NodeLabel.newInstance("AMLABEL");
|
||||
NodeLabel appLabel = NodeLabel.newInstance("APPLABEL");
|
||||
List<NodeLabel> labels = new ArrayList<>();
|
||||
labels.add(amLabel);
|
||||
labels.add(appLabel);
|
||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||
rm1.start();
|
||||
// Add label
|
||||
rm1.getAdminService().addToClusterNodeLabels(
|
||||
AddToClusterNodeLabelsRequest.newInstance(labels));
|
||||
// create app and launch the AM
|
||||
ResourceRequest amResourceRequest = ResourceRequest
|
||||
.newInstance(Priority.newInstance(0), ResourceRequest.ANY,
|
||||
Resource.newInstance(200, 1), 1, true, amLabel.getName());
|
||||
ArrayList resReqs = new ArrayList<>();
|
||||
resReqs.add(amResourceRequest);
|
||||
RMApp app0 = rm1.submitApp(resReqs, appLabel.getName());
|
||||
rm1.killApp(app0.getApplicationId());
|
||||
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
||||
// start new RM
|
||||
MockRM rm2 = createMockRM(conf, memStore);
|
||||
rm2.start();
|
||||
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
||||
ApplicationReport appReport = rm2.getClientRMService().getApplicationReport(
|
||||
GetApplicationReportRequest.newInstance(app0.getApplicationId()))
|
||||
.getApplicationReport();
|
||||
Assert
|
||||
.assertEquals(amLabel.getName(), appReport.getAmNodeLabelExpression());
|
||||
Assert.assertEquals(appLabel.getName(),
|
||||
appReport.getAppNodeLabelExpression());
|
||||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testUnManagedRMRestart() throws Exception {
|
||||
// Create RM
|
||||
|
@ -471,6 +516,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
MockRM rm2 = createMockRM(conf, memStore);
|
||||
rm2.start();
|
||||
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
||||
ApplicationReport appReport = rm2.getClientRMService().getApplicationReport(
|
||||
GetApplicationReportRequest.newInstance(app0.getApplicationId()))
|
||||
.getApplicationReport();
|
||||
Assert.assertEquals(true, appReport.isUnmanagedApp());
|
||||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue