YARN-1410. Added tests to validate that clients can fail-over to a new RM
after getting an application-ID but before submission and can still submit to the newly active RM with no issues. Contributed by Xuan Gong. svn merge --ignore-ancestry -c 1575478 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1575479 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
325f94b36a
commit
71a8953ae9
|
@ -260,6 +260,10 @@ Release 2.4.0 - UNRELEASED
|
||||||
utilization for local disks so as to be able to offline full disks. (Varun
|
utilization for local disks so as to be able to offline full disks. (Varun
|
||||||
Vasudev via vinodkv)
|
Vasudev via vinodkv)
|
||||||
|
|
||||||
|
YARN-1410. Added tests to validate that clients can fail-over to a new RM
|
||||||
|
after getting an application-ID but before submission and can still submit to
|
||||||
|
the newly active RM with no issues. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.util.Records;
|
||||||
* <p>The response sent by the <code>ResourceManager</code> to the client for
|
* <p>The response sent by the <code>ResourceManager</code> to the client for
|
||||||
* a request to get a new {@link ApplicationId} for submitting applications.</p>
|
* a request to get a new {@link ApplicationId} for submitting applications.</p>
|
||||||
*
|
*
|
||||||
|
* <p>Clients can submit an application with the returned
|
||||||
|
* {@link ApplicationId}.</p>
|
||||||
|
*
|
||||||
* @see ApplicationClientProtocol#getNewApplication(GetNewApplicationRequest)
|
* @see ApplicationClientProtocol#getNewApplication(GetNewApplicationRequest)
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.exceptions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception to be thrown when Client submit an application without
|
||||||
|
* providing {@link ApplicationId} in {@link ApplicationSubmissionContext}.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class ApplicationIdNotProvidedException extends YarnException{
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 911754350L;
|
||||||
|
|
||||||
|
public ApplicationIdNotProvidedException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationIdNotProvidedException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationIdNotProvidedException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
|
||||||
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
@ -88,6 +89,11 @@ public abstract class YarnClient extends AbstractService {
|
||||||
* application has been submitted and accepted by the ResourceManager.
|
* application has been submitted and accepted by the ResourceManager.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
|
* <p>
|
||||||
|
* Should provide an {@link ApplicationId} when submits a new application,
|
||||||
|
* otherwise, it will throw the {@link ApplicationIdNotProvidedException}
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
* @param appContext
|
* @param appContext
|
||||||
* {@link ApplicationSubmissionContext} containing all the details
|
* {@link ApplicationSubmissionContext} containing all the details
|
||||||
* needed to submit a new application
|
* needed to submit a new application
|
||||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.client.api.AHSClient;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -172,15 +173,21 @@ public class YarnClientImpl extends YarnClient {
|
||||||
submitApplication(ApplicationSubmissionContext appContext)
|
submitApplication(ApplicationSubmissionContext appContext)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
ApplicationId applicationId = appContext.getApplicationId();
|
ApplicationId applicationId = appContext.getApplicationId();
|
||||||
appContext.setApplicationId(applicationId);
|
if (applicationId == null) {
|
||||||
|
throw new ApplicationIdNotProvidedException(
|
||||||
|
"ApplicationId is not provided in ApplicationSubmissionContext");
|
||||||
|
}
|
||||||
SubmitApplicationRequest request =
|
SubmitApplicationRequest request =
|
||||||
Records.newRecord(SubmitApplicationRequest.class);
|
Records.newRecord(SubmitApplicationRequest.class);
|
||||||
request.setApplicationSubmissionContext(appContext);
|
request.setApplicationSubmissionContext(appContext);
|
||||||
|
|
||||||
|
//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
|
||||||
rmClient.submitApplication(request);
|
rmClient.submitApplication(request);
|
||||||
|
|
||||||
int pollCount = 0;
|
int pollCount = 0;
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
//TODO: YARN-1764:Handle RM fail overs after the submitApplication call.
|
||||||
while (true) {
|
while (true) {
|
||||||
YarnApplicationState state =
|
YarnApplicationState state =
|
||||||
getApplicationReport(applicationId).getYarnApplicationState();
|
getApplicationReport(applicationId).getYarnApplicationState();
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
@ -110,6 +111,24 @@ public class TestYarnClient {
|
||||||
YarnApplicationState.FAILED,
|
YarnApplicationState.FAILED,
|
||||||
YarnApplicationState.KILLED
|
YarnApplicationState.KILLED
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Submit an application without ApplicationId provided
|
||||||
|
// Should get ApplicationIdNotProvidedException
|
||||||
|
ApplicationSubmissionContext contextWithoutApplicationId =
|
||||||
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
try {
|
||||||
|
client.submitApplication(contextWithoutApplicationId);
|
||||||
|
Assert.fail("Should throw the ApplicationIdNotProvidedException");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.assertTrue(e instanceof ApplicationIdNotProvidedException);
|
||||||
|
Assert.assertTrue(e.getMessage().contains(
|
||||||
|
"ApplicationId is not provided in ApplicationSubmissionContext"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("IOException is not expected.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submit the application with applicationId provided
|
||||||
|
// Should be successful
|
||||||
for (int i = 0; i < exitStates.length; ++i) {
|
for (int i = 0; i < exitStates.length; ++i) {
|
||||||
ApplicationSubmissionContext context =
|
ApplicationSubmissionContext context =
|
||||||
mock(ApplicationSubmissionContext.class);
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
|
|
@ -31,6 +31,8 @@ import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
|
@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
@ -221,13 +224,24 @@ public class MockRM extends ResourceManager {
|
||||||
public RMApp submitApp(int masterMemory, String name, String user,
|
public RMApp submitApp(int masterMemory, String name, String user,
|
||||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||||
int maxAppAttempts, Credentials ts, String appType,
|
int maxAppAttempts, Credentials ts, String appType,
|
||||||
boolean waitForAccepted, boolean keepContainers)
|
boolean waitForAccepted, boolean keepContainers) throws Exception {
|
||||||
throws Exception {
|
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
||||||
|
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
||||||
|
false, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMApp submitApp(int masterMemory, String name, String user,
|
||||||
|
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||||
|
int maxAppAttempts, Credentials ts, String appType,
|
||||||
|
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
||||||
|
ApplicationId applicationId) throws Exception {
|
||||||
|
ApplicationId appId = isAppIdProvided ? applicationId : null;
|
||||||
ApplicationClientProtocol client = getClientRMService();
|
ApplicationClientProtocol client = getClientRMService();
|
||||||
|
if (! isAppIdProvided) {
|
||||||
GetNewApplicationResponse resp = client.getNewApplication(Records
|
GetNewApplicationResponse resp = client.getNewApplication(Records
|
||||||
.newRecord(GetNewApplicationRequest.class));
|
.newRecord(GetNewApplicationRequest.class));
|
||||||
ApplicationId appId = resp.getApplicationId();
|
appId = resp.getApplicationId();
|
||||||
|
}
|
||||||
SubmitApplicationRequest req = Records
|
SubmitApplicationRequest req = Records
|
||||||
.newRecord(SubmitApplicationRequest.class);
|
.newRecord(SubmitApplicationRequest.class);
|
||||||
ApplicationSubmissionContext sub = Records
|
ApplicationSubmissionContext sub = Records
|
||||||
|
@ -502,4 +516,12 @@ public class MockRM extends ResourceManager {
|
||||||
return am;
|
return am;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ApplicationReport getApplicationReport(ApplicationId appId)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
ApplicationClientProtocol client = getClientRMService();
|
||||||
|
GetApplicationReportResponse response =
|
||||||
|
client.getApplicationReport(GetApplicationReportRequest
|
||||||
|
.newInstance(appId));
|
||||||
|
return response.getApplicationReport();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,193 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
|
||||||
|
public class RMHATestBase extends ClientBaseWithFixes{
|
||||||
|
|
||||||
|
private static final int ZK_TIMEOUT_MS = 5000;
|
||||||
|
private static StateChangeRequestInfo requestInfo =
|
||||||
|
new StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
protected Configuration configuration = new YarnConfiguration();
|
||||||
|
static MockRM rm1 = null;
|
||||||
|
static MockRM rm2 = null;
|
||||||
|
Configuration confForRM1;
|
||||||
|
Configuration confForRM2;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
|
||||||
|
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
configuration.set(YarnConfiguration.RM_STORE,
|
||||||
|
ZKRMStateStore.class.getName());
|
||||||
|
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
||||||
|
configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||||
|
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
|
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
|
||||||
|
int base = 100;
|
||||||
|
for (String confKey : YarnConfiguration
|
||||||
|
.getServiceAddressConfKeys(configuration)) {
|
||||||
|
configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
|
||||||
|
+ (base + 20));
|
||||||
|
configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
|
||||||
|
+ (base + 40));
|
||||||
|
base = base * 2;
|
||||||
|
}
|
||||||
|
confForRM1 = new Configuration(configuration);
|
||||||
|
confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
|
||||||
|
confForRM2 = new Configuration(configuration);
|
||||||
|
confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() {
|
||||||
|
if (rm1 != null) {
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
if (rm2 != null) {
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
||||||
|
throws Exception {
|
||||||
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||||
|
am.registerAppAttempt();
|
||||||
|
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
||||||
|
rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
RMAppAttemptState.RUNNING);
|
||||||
|
return am;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startRMs() throws IOException {
|
||||||
|
rm1 = new MockRM(confForRM1);
|
||||||
|
rm2 = new MockRM(confForRM2);
|
||||||
|
startRMs(rm1, confForRM1, rm2, confForRM2);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startRMsWithCustomizedRMAppManager() throws IOException {
|
||||||
|
final Configuration conf1 = new Configuration(confForRM1);
|
||||||
|
|
||||||
|
rm1 = new MockRM(conf1) {
|
||||||
|
@Override
|
||||||
|
protected RMAppManager createRMAppManager() {
|
||||||
|
return new MyRMAppManager(this.rmContext, this.scheduler,
|
||||||
|
this.masterService, this.applicationACLsManager, conf1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
rm2 = new MockRM(confForRM2);
|
||||||
|
|
||||||
|
startRMs(rm1, conf1, rm2, confForRM2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class MyRMAppManager extends RMAppManager {
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private RMContext rmContext;
|
||||||
|
|
||||||
|
public MyRMAppManager(RMContext context, YarnScheduler scheduler,
|
||||||
|
ApplicationMasterService masterService,
|
||||||
|
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||||
|
super(context, scheduler, masterService, applicationACLsManager, conf);
|
||||||
|
this.conf = conf;
|
||||||
|
this.rmContext = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void submitApplication(
|
||||||
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
|
String user, boolean isRecovered, RMState state) throws YarnException {
|
||||||
|
//Do nothing, just add the application to RMContext
|
||||||
|
RMAppImpl application =
|
||||||
|
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
|
||||||
|
this.conf, submissionContext.getApplicationName(), user,
|
||||||
|
submissionContext.getQueue(), submissionContext,
|
||||||
|
this.rmContext.getScheduler(),
|
||||||
|
this.rmContext.getApplicationMasterService(),
|
||||||
|
submitTime, submissionContext.getApplicationType(),
|
||||||
|
submissionContext.getApplicationTags());
|
||||||
|
this.rmContext.getRMApps().put(submissionContext.getApplicationId(),
|
||||||
|
application);
|
||||||
|
//Do not send RMAppEventType.START event
|
||||||
|
//so the state of Application will not reach to NEW_SAVING state.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isFinalState(RMAppState state) {
|
||||||
|
return state.equals(RMAppState.FINISHING)
|
||||||
|
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|
||||||
|
|| state.equals(RMAppState.KILLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void explicitFailover() throws IOException {
|
||||||
|
rm1.adminService.transitionToStandby(requestInfo);
|
||||||
|
rm2.adminService.transitionToActive(requestInfo);
|
||||||
|
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
|
||||||
|
== HAServiceState.STANDBY);
|
||||||
|
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
|
||||||
|
== HAServiceState.ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startRMs(MockRM rm1, Configuration confForRM1, MockRM rm2,
|
||||||
|
Configuration confForRM2) throws IOException {
|
||||||
|
rm1.init(confForRM1);
|
||||||
|
rm1.start();
|
||||||
|
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
|
||||||
|
== HAServiceState.STANDBY);
|
||||||
|
|
||||||
|
rm2.init(confForRM2);
|
||||||
|
rm2.start();
|
||||||
|
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
|
||||||
|
== HAServiceState.STANDBY);
|
||||||
|
|
||||||
|
rm1.adminService.transitionToActive(requestInfo);
|
||||||
|
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
|
||||||
|
== HAServiceState.ACTIVE);
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,86 +24,29 @@ import java.io.IOException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
|
public class TestKillApplicationWithRMHA extends RMHATestBase{
|
||||||
|
|
||||||
public static final Log LOG = LogFactory
|
public static final Log LOG = LogFactory
|
||||||
.getLog(TestKillApplicationWithRMHA.class);
|
.getLog(TestKillApplicationWithRMHA.class);
|
||||||
private static final int ZK_TIMEOUT_MS = 5000;
|
|
||||||
private static StateChangeRequestInfo requestInfo =
|
|
||||||
new StateChangeRequestInfo(
|
|
||||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
|
||||||
private Configuration configuration = new YarnConfiguration();
|
|
||||||
static MockRM rm1 = null;
|
|
||||||
static MockRM rm2 = null;
|
|
||||||
Configuration confForRM1;
|
|
||||||
Configuration confForRM2;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setup() throws Exception {
|
|
||||||
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
|
||||||
configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
|
|
||||||
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
||||||
configuration.set(YarnConfiguration.RM_STORE,
|
|
||||||
ZKRMStateStore.class.getName());
|
|
||||||
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
|
||||||
configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
|
||||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
|
||||||
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
|
|
||||||
int base = 100;
|
|
||||||
for (String confKey : YarnConfiguration
|
|
||||||
.getServiceAddressConfKeys(configuration)) {
|
|
||||||
configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
|
|
||||||
+ (base + 20));
|
|
||||||
configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
|
|
||||||
+ (base + 40));
|
|
||||||
base = base * 2;
|
|
||||||
}
|
|
||||||
confForRM1 = new Configuration(configuration);
|
|
||||||
confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
|
|
||||||
confForRM2 = new Configuration(configuration);
|
|
||||||
confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void teardown() {
|
|
||||||
if (rm1 != null) {
|
|
||||||
rm1.stop();
|
|
||||||
}
|
|
||||||
if (rm2 != null) {
|
|
||||||
rm2.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test (timeout = 20000)
|
@Test (timeout = 20000)
|
||||||
public void testKillAppWhenFailoverHappensAtNewState()
|
public void testKillAppWhenFailoverHappensAtNewState()
|
||||||
|
@ -221,18 +164,6 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
|
||||||
throws Exception {
|
|
||||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
|
||||||
nm.nodeHeartbeat(true);
|
|
||||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
|
||||||
am.registerAppAttempt();
|
|
||||||
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
|
||||||
rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
|
|
||||||
RMAppAttemptState.RUNNING);
|
|
||||||
return am;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void failOverAndKillApp(ApplicationId appId,
|
private void failOverAndKillApp(ApplicationId appId,
|
||||||
ApplicationAttemptId appAttemptId, RMAppState initialRMAppState,
|
ApplicationAttemptId appAttemptId, RMAppState initialRMAppState,
|
||||||
RMAppAttemptState initialRMAppAttemptState,
|
RMAppAttemptState initialRMAppAttemptState,
|
||||||
|
@ -256,29 +187,6 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
|
||||||
killApplication(rm2, appId, null, initialRMAppState);
|
killApplication(rm2, appId, null, initialRMAppState);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startRMs() throws IOException {
|
|
||||||
rm1 = new MockRM(confForRM1);
|
|
||||||
rm2 = new MockRM(confForRM2);
|
|
||||||
startRMs(rm1, confForRM1, rm2, confForRM2);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startRMsWithCustomizedRMAppManager() throws IOException {
|
|
||||||
final Configuration conf1 = new Configuration(confForRM1);
|
|
||||||
|
|
||||||
rm1 = new MockRM(conf1) {
|
|
||||||
@Override
|
|
||||||
protected RMAppManager createRMAppManager() {
|
|
||||||
return new MyRMAppManager(this.rmContext, this.scheduler,
|
|
||||||
this.masterService, this.applicationACLsManager, conf1);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
rm2 = new MockRM(confForRM2);
|
|
||||||
|
|
||||||
startRMs(rm1, conf1, rm2, confForRM2);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startRMsWithCustomizedClientRMService() throws IOException {
|
private void startRMsWithCustomizedClientRMService() throws IOException {
|
||||||
final Configuration conf1 = new Configuration(confForRM1);
|
final Configuration conf1 = new Configuration(confForRM1);
|
||||||
|
|
||||||
|
@ -296,39 +204,6 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
|
||||||
startRMs(rm1, conf1, rm2, confForRM2);
|
startRMs(rm1, conf1, rm2, confForRM2);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class MyRMAppManager extends RMAppManager {
|
|
||||||
|
|
||||||
private Configuration conf;
|
|
||||||
private RMContext rmContext;
|
|
||||||
|
|
||||||
public MyRMAppManager(RMContext context, YarnScheduler scheduler,
|
|
||||||
ApplicationMasterService masterService,
|
|
||||||
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
|
||||||
super(context, scheduler, masterService, applicationACLsManager, conf);
|
|
||||||
this.conf = conf;
|
|
||||||
this.rmContext = context;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void submitApplication(
|
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
|
||||||
String user, boolean isRecovered, RMState state) throws YarnException {
|
|
||||||
//Do nothing, just add the application to RMContext
|
|
||||||
RMAppImpl application =
|
|
||||||
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
|
|
||||||
this.conf, submissionContext.getApplicationName(), user,
|
|
||||||
submissionContext.getQueue(), submissionContext,
|
|
||||||
this.rmContext.getScheduler(),
|
|
||||||
this.rmContext.getApplicationMasterService(),
|
|
||||||
submitTime, submissionContext.getApplicationType(),
|
|
||||||
submissionContext.getApplicationTags());
|
|
||||||
this.rmContext.getRMApps().put(submissionContext.getApplicationId(),
|
|
||||||
application);
|
|
||||||
//Do not send RMAppEventType.START event
|
|
||||||
//so the state of Application will not reach to NEW_SAVING state.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class MyClientRMService extends ClientRMService {
|
private static class MyClientRMService extends ClientRMService {
|
||||||
|
|
||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
|
@ -366,21 +241,6 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isFinalState(RMAppState state) {
|
|
||||||
return state.equals(RMAppState.FINISHING)
|
|
||||||
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|
|
||||||
|| state.equals(RMAppState.KILLED);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void explicitFailover() throws IOException {
|
|
||||||
rm1.adminService.transitionToStandby(requestInfo);
|
|
||||||
rm2.adminService.transitionToActive(requestInfo);
|
|
||||||
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
|
|
||||||
== HAServiceState.STANDBY);
|
|
||||||
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
|
|
||||||
== HAServiceState.ACTIVE);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void killApplication(MockRM rm, ApplicationId appId,
|
private void killApplication(MockRM rm, ApplicationId appId,
|
||||||
ApplicationAttemptId appAttemptId, RMAppState rmAppState)
|
ApplicationAttemptId appAttemptId, RMAppState rmAppState)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -396,21 +256,4 @@ public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{
|
||||||
// no new attempt is created.
|
// no new attempt is created.
|
||||||
Assert.assertEquals(1, loadedApp0.getAppAttempts().size());
|
Assert.assertEquals(1, loadedApp0.getAppAttempts().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startRMs(MockRM rm1, Configuration confForRM1, MockRM rm2,
|
|
||||||
Configuration confForRM2) throws IOException {
|
|
||||||
rm1.init(confForRM1);
|
|
||||||
rm1.start();
|
|
||||||
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
|
|
||||||
== HAServiceState.STANDBY);
|
|
||||||
|
|
||||||
rm2.init(confForRM2);
|
|
||||||
rm2.start();
|
|
||||||
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
|
|
||||||
== HAServiceState.STANDBY);
|
|
||||||
|
|
||||||
rm1.adminService.transitionToActive(requestInfo);
|
|
||||||
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
|
|
||||||
== HAServiceState.ACTIVE);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestSubmitApplicationWithRMHA extends RMHATestBase{
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory
|
||||||
|
.getLog(TestSubmitApplicationWithRMHA.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void
|
||||||
|
testHandleRMHABeforeSubmitApplicationCallWithSavedApplicationState()
|
||||||
|
throws Exception {
|
||||||
|
// start two RMs, and transit rm1 to active, rm2 to standby
|
||||||
|
startRMs();
|
||||||
|
|
||||||
|
// get a new applicationId from rm1
|
||||||
|
ApplicationId appId = rm1.getNewAppId().getApplicationId();
|
||||||
|
|
||||||
|
// Do the failover
|
||||||
|
explicitFailover();
|
||||||
|
|
||||||
|
// submit the application with previous assigned applicationId
|
||||||
|
// to current active rm: rm2
|
||||||
|
RMApp app1 =
|
||||||
|
rm2.submitApp(200, "", UserGroupInformation
|
||||||
|
.getCurrentUser().getShortUserName(), null, false, null,
|
||||||
|
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
||||||
|
false, false, true, appId);
|
||||||
|
|
||||||
|
// verify application submission
|
||||||
|
verifySubmitApp(rm2, app1, appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifySubmitApp(MockRM rm, RMApp app,
|
||||||
|
ApplicationId expectedAppId) throws Exception {
|
||||||
|
int maxWaittingTimes = 20;
|
||||||
|
int count = 0;
|
||||||
|
while (true) {
|
||||||
|
YarnApplicationState state =
|
||||||
|
rm.getApplicationReport(app.getApplicationId())
|
||||||
|
.getYarnApplicationState();
|
||||||
|
if (!state.equals(YarnApplicationState.NEW) &&
|
||||||
|
!state.equals(YarnApplicationState.NEW_SAVING)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (count > maxWaittingTimes) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(200);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
// Verify submittion is successful
|
||||||
|
Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
|
||||||
|
.getYarnApplicationState() == YarnApplicationState.NEW);
|
||||||
|
Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
|
||||||
|
.getYarnApplicationState() == YarnApplicationState.NEW_SAVING);
|
||||||
|
Assert.assertEquals(expectedAppId, app.getApplicationId());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue