YARN-11323. [Federation] Improve ResourceManager Handler FinishApps. (#4954)

This commit is contained in:
slfan1989 2022-10-12 05:53:02 +08:00 committed by GitHub
parent 9e16f1f883
commit d78b0b39a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 596 additions and 0 deletions

View File

@ -4061,6 +4061,17 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000;
public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT =
FEDERATION_PREFIX + "state-store.clean-up-retry-count";
public static final int DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = 1;
public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time";
public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
TimeUnit.SECONDS.toMillis(1);
public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";

View File

@ -3727,6 +3727,26 @@
<value>yarnfederation/</value>
</property>
<property>
<description>
The number of retries to clear the app in the FederationStateStore,
the default value is 1, that is, after the app fails to clean up, it will retry the cleanup again.
</description>
<name>yarn.federation.state-store.clean-up-retry-count</name>
<value>1</value>
</property>
<property>
<description>
Clear the sleep time of App retry in FederationStateStore.
When the app fails to clean up,
it will sleep for a period of time and then try to clean up.
The default value is 1s.
</description>
<name>yarn.federation.state-store.clean-up-retry-sleep-time</name>
<value>1s</value>
</property>
<!-- Other Configuration -->
<property>

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface FederationActionRetry<T> {
Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class);
T run() throws Exception;
default T runWithRetries(int retryCount, long retrySleepTime) throws Exception {
int retry = 0;
while (true) {
try {
return run();
} catch (Exception e) {
LOG.info("Exception while executing an Federation operation.", e);
if (++retry > retryCount) {
LOG.info("Maxed out Federation retries. Giving up!");
throw e;
}
LOG.info("Retrying operation on Federation. Retry no. {}", retry);
Thread.sleep(retrySleepTime);
}
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Federation Retry Policies. **/
package org.apache.hadoop.yarn.server.federation.retry;

View File

@ -28,9 +28,11 @@ import java.util.concurrent.Future;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -114,6 +116,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private boolean nodeLabelsEnabled;
private Set<String> exclusiveEnforcedPartitions;
private String amDefaultNodeLabel;
private FederationStateStoreService federationStateStoreService;
private static final String USER_ID_PREFIX = "userid=";
@ -347,6 +350,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
+ ", removing app " + removeApp.getApplicationId()
+ " from state store.");
rmContext.getStateStore().removeApplication(removeApp);
removeApplicationIdFromStateStore(removeId);
completedAppsInStateStore--;
}
@ -358,6 +362,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
+ " from memory: ");
rmContext.getRMApps().remove(removeId);
removeApplicationIdFromStateStore(removeId);
this.applicationACLsManager.removeApplication(removeId);
}
}
@ -1054,4 +1059,42 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
context.setQueue(placementContext.getQueue());
}
}
@VisibleForTesting
public void setFederationStateStoreService(FederationStateStoreService stateStoreService) {
this.federationStateStoreService = stateStoreService;
}
/**
* Remove ApplicationId From StateStore.
*
* @param appId appId
*/
private void removeApplicationIdFromStateStore(ApplicationId appId) {
if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != null) {
try {
boolean cleanUpResult =
federationStateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
if(cleanUpResult){
LOG.info("applicationId = {} remove from state store success.", appId);
} else {
LOG.warn("applicationId = {} remove from state store failed.", appId);
}
} catch (Exception e) {
LOG.error("applicationId = {} remove from state store error.", appId, e);
}
}
}
// just test using
@VisibleForTesting
public void checkAppNumCompletedLimit4Test() {
checkAppNumCompletedLimit();
}
// just test using
@VisibleForTesting
public void finishApplication4Test(ApplicationId applicationId) {
finishApplication(applicationId);
}
}

View File

@ -917,6 +917,7 @@ public class ResourceManager extends CompositeService
}
federationStateStoreService = createFederationStateStoreService();
addIfService(federationStateStoreService);
rmAppManager.setFederationStateStoreService(federationStateStoreService);
LOG.info("Initialized Federation membership.");
}
@ -996,6 +997,13 @@ public class ResourceManager extends CompositeService
RMState state = rmStore.loadState();
recover(state);
LOG.info("Recovery ended");
// Make sure that the App is cleaned up after the RM memory is restored.
if (HAUtil.isFederationEnabled(conf)) {
federationStateStoreService.
createCleanUpFinishApplicationThread("Recovery");
}
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.federation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.List;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
@ -29,9 +32,11 @@ import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@ -74,10 +79,12 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationH
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -102,6 +109,9 @@ public class FederationStateStoreService extends AbstractService
private long heartbeatInterval;
private long heartbeatInitialDelay;
private RMContext rmContext;
private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread";
private int cleanUpRetryCountNum;
private long cleanUpRetrySleepTime;
public FederationStateStoreService(RMContext rmContext) {
super(FederationStateStoreService.class.getName());
@ -149,6 +159,15 @@ public class FederationStateStoreService extends AbstractService
heartbeatInitialDelay =
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY;
}
cleanUpRetryCountNum = conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT);
cleanUpRetrySleepTime = conf.getTimeDuration(
YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME,
TimeUnit.MILLISECONDS);
LOG.info("Initialized federation membership service.");
super.serviceInit(conf);
@ -378,4 +397,160 @@ public class FederationStateStoreService extends AbstractService
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
/**
* Create a thread that cleans up the app.
* @param stage rm-start/rm-stop.
*/
public void createCleanUpFinishApplicationThread(String stage) {
String threadName = cleanUpThreadNamePrefix + "-" + stage;
Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread());
finishApplicationThread.setName(threadName);
finishApplicationThread.start();
LOG.info("CleanUpFinishApplicationThread has been started {}.", threadName);
}
/**
* Create a thread that cleans up the apps.
*
* @return thread object.
*/
private Runnable createCleanUpFinishApplicationThread() {
return () -> {
createCleanUpFinishApplication();
};
}
/**
* cleans up the apps.
*/
private void createCleanUpFinishApplication() {
try {
// Get the current RM's App list based on subClusterId
GetApplicationsHomeSubClusterRequest request =
GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
GetApplicationsHomeSubClusterResponse response =
getApplicationsHomeSubCluster(request);
List<ApplicationHomeSubCluster> applicationHomeSCs = response.getAppsHomeSubClusters();
// Traverse the app list and clean up the app.
long successCleanUpAppCount = 0;
// Save a local copy of the map so that it won't change with the map
Map<ApplicationId, RMApp> rmApps = new HashMap<>(this.rmContext.getRMApps());
// Need to make sure there is app list in RM memory.
if (rmApps != null && !rmApps.isEmpty()) {
for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) {
ApplicationId applicationId = applicationHomeSC.getApplicationId();
if (!rmApps.containsKey(applicationId)) {
try {
Boolean cleanUpSuccess = cleanUpFinishApplicationsWithRetries(applicationId, false);
if (cleanUpSuccess) {
LOG.info("application = {} has been cleaned up successfully.", applicationId);
successCleanUpAppCount++;
}
} catch (Exception e) {
LOG.error("problem during application = {} cleanup.", applicationId, e);
}
}
}
}
// print app cleanup log
LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.",
applicationHomeSCs.size(), successCleanUpAppCount);
} catch (Exception e) {
LOG.error("problem during cleanup applications.", e);
}
}
/**
* Clean up the federation completed Application.
*
* @param appId app id.
* @param isQuery true, need to query from statestore, false not query.
* @throws Exception exception occurs.
* @return true, successfully deleted; false, failed to delete or no need to delete
*/
public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean isQuery)
throws Exception {
// Generate a request to delete data
DeleteApplicationHomeSubClusterRequest request =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
// CleanUp Finish App.
return ((FederationActionRetry<Boolean>) () -> invokeCleanUpFinishApp(appId, isQuery, request))
.runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime);
}
/**
* CleanUp Finish App.
*
* @param applicationId app id.
* @param isQuery true, need to query from statestore, false not query.
* @param delRequest delete Application Request
* @return true, successfully deleted; false, failed to delete or no need to delete
* @throws YarnException
*/
private boolean invokeCleanUpFinishApp(ApplicationId applicationId, boolean isQuery,
DeleteApplicationHomeSubClusterRequest delRequest) throws YarnException {
boolean isAppNeedClean = true;
// If we need to query the StateStore
if (isQuery) {
isAppNeedClean = isApplicationNeedClean(applicationId);
}
// When the App needs to be cleaned up, clean up the App.
if (isAppNeedClean) {
DeleteApplicationHomeSubClusterResponse response =
deleteApplicationHomeSubCluster(delRequest);
if (response != null) {
LOG.info("The applicationId = {} has been successfully cleaned up.", applicationId);
return true;
}
}
return false;
}
/**
* Used to determine whether the Application is cleaned up.
*
* When the app in the RM is completed,
* the HomeSC corresponding to the app will be queried in the StateStore.
* If the current RM is the HomeSC, the completed app will be cleaned up.
*
* @param applicationId applicationId
* @return true, app needs to be cleaned up;
* false, app doesn't need to be cleaned up.
*/
private boolean isApplicationNeedClean(ApplicationId applicationId) {
GetApplicationHomeSubClusterRequest queryRequest =
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
// Here we need to use try...catch,
// because getApplicationHomeSubCluster may throw not exist exception
try {
GetApplicationHomeSubClusterResponse queryResp =
getApplicationHomeSubCluster(queryRequest);
if (queryResp != null) {
ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster();
SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster();
if (!subClusterId.equals(homeSubClusterId)) {
LOG.warn("The homeSubCluster of applicationId = {} belong subCluster = {}, " +
" not belong subCluster = {} and is not allowed to delete.",
applicationId, homeSubClusterId, subClusterId);
return false;
}
} else {
LOG.warn("The applicationId = {} not belong subCluster = {} " +
" and is not allowed to delete.", applicationId, subClusterId);
return false;
}
} catch (Exception e) {
LOG.warn("query applicationId = {} error.", applicationId, e);
return false;
}
return true;
}
}

View File

@ -20,22 +20,46 @@ package org.apache.hadoop.yarn.server.resourcemanager.federation;
import java.io.IOException;
import java.io.StringReader;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.xml.bind.JAXBException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.junit.After;
import org.junit.Assert;
@ -46,6 +70,8 @@ import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONUnmarshaller;
import static org.mockito.Mockito.mock;
/**
* Unit tests for FederationStateStoreService.
*/
@ -207,4 +233,253 @@ public class TestFederationRMStateStoreService {
"Started federation membership heartbeat with interval: 300 and initial delay: 10"));
rm.stop();
}
@Test
public void testCleanUpApplication() throws Exception {
// set yarn configuration
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
// set up MockRM
final MockRM rm = new MockRM(conf);
rm.init(conf);
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
rm.start();
// init subCluster Heartbeat,
// and check that the subCluster is in a running state
FederationStateStoreService stateStoreService =
rm.getFederationStateStoreService();
FederationStateStoreHeartbeat storeHeartbeat =
stateStoreService.getStateStoreHeartbeatThread();
storeHeartbeat.run();
checkSubClusterInfo(SubClusterState.SC_RUNNING);
// generate an application and join the [SC-1] cluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
addApplication2StateStore(appId, stateStore);
// make sure the app can be queried in the stateStore
GetApplicationHomeSubClusterRequest appRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(appRequest);
Assert.assertNotNull(response);
ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
Assert.assertNotNull(appHomeSubCluster);
Assert.assertNotNull(appHomeSubCluster.getApplicationId());
Assert.assertEquals(appId, appHomeSubCluster.getApplicationId());
// clean up the app.
boolean cleanUpResult =
stateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
Assert.assertTrue(cleanUpResult);
// after clean, the app can no longer be queried from the stateStore.
LambdaTestUtils.intercept(FederationStateStoreException.class,
"Application " + appId + " does not exist",
() -> stateStore.getApplicationHomeSubCluster(appRequest));
}
@Test
public void testCleanUpApplicationWhenRMStart() throws Exception {
// We design such a test case.
// Step1. We add app01, app02, app03 to the stateStore,
// But these apps are not in RM's RMContext, they are finished apps
// Step2. We simulate RM startup, there is only app04 in RMContext.
// Step3. We wait for 5 seconds, the automatic cleanup thread should clean up finished apps.
// set yarn configuration.
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
// set up MockRM.
MockRM rm = new MockRM(conf);
rm.init(conf);
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
// generate an [app01] and join the [SC-1] cluster.
List<ApplicationId> appIds = new ArrayList<>();
ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1);
addApplication2StateStore(appId01, stateStore);
appIds.add(appId01);
// generate an [app02] and join the [SC-1] cluster.
ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2);
addApplication2StateStore(appId02, stateStore);
appIds.add(appId02);
// generate an [app03] and join the [SC-1] cluster.
ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3);
addApplication2StateStore(appId03, stateStore);
appIds.add(appId03);
// make sure the apps can be queried in the stateStore.
GetApplicationsHomeSubClusterRequest allRequest =
GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
GetApplicationsHomeSubClusterResponse allResponse =
stateStore.getApplicationsHomeSubCluster(allRequest);
Assert.assertNotNull(allResponse);
List<ApplicationHomeSubCluster> appHomeSCLists = allResponse.getAppsHomeSubClusters();
Assert.assertNotNull(appHomeSCLists);
Assert.assertEquals(3, appHomeSCLists.size());
// app04 exists in both RM memory and stateStore.
ApplicationId appId04 = ApplicationId.newInstance(Time.now(), 4);
addApplication2StateStore(appId04, stateStore);
addApplication2RMAppManager(rm, appId04);
// start rm.
rm.start();
// wait 5s, wait for the thread to finish cleaning up.
GenericTestUtils.waitFor(() -> {
int appsSize = 0;
try {
List<ApplicationHomeSubCluster> subClusters =
getApplicationsFromStateStore();
Assert.assertNotNull(subClusters);
appsSize = subClusters.size();
} catch (YarnException e) {
e.printStackTrace();
}
return (appsSize == 1);
}, 100, 1000 * 5);
// check the app to make sure the apps(app01,app02,app03) doesn't exist.
for (ApplicationId appId : appIds) {
GetApplicationHomeSubClusterRequest appRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId);
LambdaTestUtils.intercept(FederationStateStoreException.class,
"Application " + appId + " does not exist",
() -> stateStore.getApplicationHomeSubCluster(appRequest));
}
if (rm != null) {
rm.stop();
rm = null;
}
}
@Test
public void testCleanUpApplicationWhenRMCompleteOneApp() throws Exception {
// We design such a test case.
// Step1. We start RMSet the RM memory to keep a maximum of 1 completed app.
// Step2. Register app[01-03] to RM memory & stateStore.
// Step3. We clean up app01, app02, app03, at this time,
// app01, app02 should be cleaned up from statestore, app03 should remain in statestore.
// set yarn configuration.
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// set up MockRM.
MockRM rm = new MockRM(conf);
rm.init(conf);
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
rm.start();
// generate an [app01] and join the [SC-1] cluster.
List<ApplicationId> appIds = new ArrayList<>();
ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1);
addApplication2StateStore(appId01, stateStore);
addApplication2RMAppManager(rm, appId01);
appIds.add(appId01);
// generate an [app02] and join the [SC-1] cluster.
ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2);
addApplication2StateStore(appId02, stateStore);
addApplication2RMAppManager(rm, appId02);
appIds.add(appId02);
// generate an [app03] and join the [SC-1] cluster.
ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3);
addApplication2StateStore(appId03, stateStore);
addApplication2RMAppManager(rm, appId03);
// rmAppManager
RMAppManager rmAppManager = rm.getRMAppManager();
rmAppManager.finishApplication4Test(appId01);
rmAppManager.finishApplication4Test(appId02);
rmAppManager.finishApplication4Test(appId03);
rmAppManager.checkAppNumCompletedLimit4Test();
// app01, app02 should be cleaned from statestore
// After the query, it should report the error not exist.
for (ApplicationId appId : appIds) {
GetApplicationHomeSubClusterRequest appRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId);
LambdaTestUtils.intercept(FederationStateStoreException.class,
"Application " + appId + " does not exist",
() -> stateStore.getApplicationHomeSubCluster(appRequest));
}
// app03 should remain in statestore
List<ApplicationHomeSubCluster> appHomeScList = getApplicationsFromStateStore();
Assert.assertNotNull(appHomeScList);
Assert.assertEquals(1, appHomeScList.size());
ApplicationHomeSubCluster homeSubCluster = appHomeScList.get(0);
Assert.assertNotNull(homeSubCluster);
Assert.assertEquals(appId03, homeSubCluster.getApplicationId());
}
private void addApplication2StateStore(ApplicationId appId,
FederationStateStore fedStateStore) throws YarnException {
ApplicationHomeSubCluster appHomeSC = ApplicationHomeSubCluster.newInstance(
appId, subClusterId);
AddApplicationHomeSubClusterRequest addHomeSCRequest =
AddApplicationHomeSubClusterRequest.newInstance(appHomeSC);
fedStateStore.addApplicationHomeSubCluster(addHomeSCRequest);
}
private List<ApplicationHomeSubCluster> getApplicationsFromStateStore() throws YarnException {
// make sure the apps can be queried in the stateStore
GetApplicationsHomeSubClusterRequest allRequest =
GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
GetApplicationsHomeSubClusterResponse allResponse =
stateStore.getApplicationsHomeSubCluster(allRequest);
Assert.assertNotNull(allResponse);
List<ApplicationHomeSubCluster> appHomeSCLists = allResponse.getAppsHomeSubClusters();
Assert.assertNotNull(appHomeSCLists);
return appHomeSCLists;
}
private void addApplication2RMAppManager(MockRM rm, ApplicationId appId) {
RMContext rmContext = rm.getRMContext();
Map<ApplicationId, RMApp> rmAppMaps = rmContext.getRMApps();
String user = MockApps.newUserName();
String name = MockApps.newAppName();
String queue = MockApps.newQueue();
YarnScheduler scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
ApplicationSubmissionContext submissionContext =
new ApplicationSubmissionContextPBImpl();
// applicationId will not be used because RMStateStore is mocked,
// but applicationId is still set for safety
submissionContext.setApplicationId(appId);
submissionContext.setPriority(Priority.newInstance(0));
RMApp application = new RMAppImpl(appId, rmContext, conf, name,
user, queue, submissionContext, scheduler, masterService,
System.currentTimeMillis(), "YARN", null,
new ArrayList<>());
rmAppMaps.putIfAbsent(application.getApplicationId(), application);
}
}