YARN-9013. [GPG] fix order of steps cleaning Registry entries in ApplicationCleaner. Contributed by Botong Huang.
This commit is contained in:
parent
d93507ef15
commit
6e70802532
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||
|
@ -96,6 +95,10 @@ public abstract class ApplicationCleaner implements Runnable {
|
|||
return this.gpgContext;
|
||||
}
|
||||
|
||||
public FederationRegistryClient getRegistryClient() {
|
||||
return this.registryClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query router for applications.
|
||||
*
|
||||
|
@ -154,18 +157,6 @@ public abstract class ApplicationCleaner implements Runnable {
|
|||
+ " success Router queries after " + totalAttemptCount + " retries");
|
||||
}
|
||||
|
||||
protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
|
||||
List<String> allApps = this.registryClient.getAllApplications();
|
||||
LOG.info("Got " + allApps.size() + " existing apps in registry");
|
||||
for (String app : allApps) {
|
||||
ApplicationId appId = ApplicationId.fromString(app);
|
||||
if (!knownApps.contains(appId)) {
|
||||
LOG.info("removing finished application entry for {}", app);
|
||||
this.registryClient.removeAppFromRegistry(appId, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract void run();
|
||||
}
|
||||
|
|
|
@ -46,26 +46,37 @@ public class DefaultApplicationCleaner extends ApplicationCleaner {
|
|||
LOG.info("Application cleaner run at time {}", now);
|
||||
|
||||
FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
|
||||
Set<ApplicationId> candidates = new HashSet<ApplicationId>();
|
||||
try {
|
||||
// Get the candidate list from StateStore before calling router
|
||||
Set<ApplicationId> allStateStoreApps = new HashSet<ApplicationId>();
|
||||
List<ApplicationHomeSubCluster> response =
|
||||
facade.getApplicationsHomeSubCluster();
|
||||
for (ApplicationHomeSubCluster app : response) {
|
||||
candidates.add(app.getApplicationId());
|
||||
allStateStoreApps.add(app.getApplicationId());
|
||||
}
|
||||
LOG.info("{} app entries in FederationStateStore", candidates.size());
|
||||
LOG.info("{} app entries in FederationStateStore",
|
||||
allStateStoreApps.size());
|
||||
|
||||
// Get the candidate list from Registry before calling router
|
||||
List<String> allRegistryApps = getRegistryClient().getAllApplications();
|
||||
LOG.info("{} app entries in FederationRegistry",
|
||||
allStateStoreApps.size());
|
||||
|
||||
// Get the list of known apps from Router
|
||||
Set<ApplicationId> routerApps = getRouterKnownApplications();
|
||||
LOG.info("{} known applications from Router", routerApps.size());
|
||||
|
||||
candidates = Sets.difference(candidates, routerApps);
|
||||
LOG.info("Deleting {} applications from statestore", candidates.size());
|
||||
// Clean up StateStore entries
|
||||
Set<ApplicationId> toDelete =
|
||||
Sets.difference(allStateStoreApps, routerApps);
|
||||
LOG.info("Deleting {} applications from statestore", toDelete.size());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Apps to delete: ", candidates.stream().map(Object::toString)
|
||||
LOG.debug("Apps to delete: ", toDelete.stream().map(Object::toString)
|
||||
.collect(Collectors.joining(",")));
|
||||
}
|
||||
for (ApplicationId appId : candidates) {
|
||||
for (ApplicationId appId : toDelete) {
|
||||
try {
|
||||
LOG.debug("Deleting {} from statestore ", appId);
|
||||
facade.deleteApplicationHomeSubCluster(appId);
|
||||
} catch (Exception e) {
|
||||
LOG.error(
|
||||
|
@ -74,8 +85,15 @@ public class DefaultApplicationCleaner extends ApplicationCleaner {
|
|||
}
|
||||
}
|
||||
|
||||
// Clean up registry entries
|
||||
cleanupAppRecordInRegistry(routerApps);
|
||||
// Clean up Registry entries
|
||||
for (String app : allRegistryApps) {
|
||||
ApplicationId appId = ApplicationId.fromString(app);
|
||||
if (!routerApps.contains(appId)) {
|
||||
LOG.debug("removing finished application entry for {}", app);
|
||||
getRegistryClient().removeAppFromRegistry(appId, true);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Application cleaner started at time " + now + " fails: ", e);
|
||||
}
|
||||
|
|
|
@ -63,6 +63,8 @@ public class TestDefaultApplicationCleaner {
|
|||
// The list of applications returned by mocked router
|
||||
private Set<ApplicationId> routerAppIds;
|
||||
|
||||
private ApplicationId appIdToAddConcurrently;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
conf = new YarnConfiguration();
|
||||
|
@ -111,6 +113,8 @@ public class TestDefaultApplicationCleaner {
|
|||
new Token<AMRMTokenIdentifier>());
|
||||
}
|
||||
Assert.assertEquals(3, registryClient.getAllApplications().size());
|
||||
|
||||
appIdToAddConcurrently = null;
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -149,13 +153,45 @@ public class TestDefaultApplicationCleaner {
|
|||
Assert.assertEquals(1, registryClient.getAllApplications().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrentNewApp() throws YarnException {
|
||||
appIdToAddConcurrently = ApplicationId.newInstance(1, 1);
|
||||
|
||||
appCleaner.run();
|
||||
|
||||
// The concurrently added app should be still there
|
||||
Assert.assertEquals(1,
|
||||
stateStore
|
||||
.getApplicationsHomeSubCluster(
|
||||
GetApplicationsHomeSubClusterRequest.newInstance())
|
||||
.getAppsHomeSubClusters().size());
|
||||
|
||||
// The concurrently added app should be still there
|
||||
Assert.assertEquals(1, registryClient.getAllApplications().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Testable version of DefaultApplicationCleaner.
|
||||
*/
|
||||
public class TestableDefaultApplicationCleaner
|
||||
extends DefaultApplicationCleaner {
|
||||
|
||||
@Override
|
||||
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
|
||||
if (appIdToAddConcurrently != null) {
|
||||
SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
|
||||
try {
|
||||
stateStore
|
||||
.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
|
||||
.newInstance(ApplicationHomeSubCluster
|
||||
.newInstance(appIdToAddConcurrently, scId)));
|
||||
} catch (YarnException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
|
||||
registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently,
|
||||
scId.toString(), new Token<AMRMTokenIdentifier>());
|
||||
}
|
||||
return routerAppIds;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue