YARN-8679. [ATSv2] If HBase cluster is down for long time, high chances that NM ContainerManager dispatcher get blocked. Contributed by Wangda Tan.
This commit is contained in:
parent
79c97f6a0b
commit
4aacbfff60
|
@ -90,7 +90,7 @@ public class TestAMLaunchFailure {
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// @Override
|
// @Override
|
||||||
// public void addApplication(ApplicationId applicationId,
|
// public void addApplicationIfAbsent(ApplicationId applicationId,
|
||||||
// ApplicationMaster master, String user, String queue, Priority priority
|
// ApplicationMaster master, String user, String queue, Priority priority
|
||||||
// , ApplicationStore appStore)
|
// , ApplicationStore appStore)
|
||||||
// throws IOException {
|
// throws IOException {
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class TestSchedulerNegotiator {
|
||||||
// return null;
|
// return null;
|
||||||
// }
|
// }
|
||||||
// @Override
|
// @Override
|
||||||
// public void addApplication(ApplicationId applicationId,
|
// public void addApplicationIfAbsent(ApplicationId applicationId,
|
||||||
// ApplicationMaster master, String user, String queue, Priority priority,
|
// ApplicationMaster master, String user, String queue, Priority priority,
|
||||||
// ApplicationStore store)
|
// ApplicationStore store)
|
||||||
// throws IOException {
|
// throws IOException {
|
||||||
|
|
|
@ -81,7 +81,8 @@ public class TestTimelineServiceClientIntegration {
|
||||||
auxService =
|
auxService =
|
||||||
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
|
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
|
||||||
collectorManager, conf);
|
collectorManager, conf);
|
||||||
auxService.addApplication(ApplicationId.newInstance(0, 1), "user");
|
auxService
|
||||||
|
.addApplicationIfAbsent(ApplicationId.newInstance(0, 1), "user");
|
||||||
} catch (ExitUtil.ExitException e) {
|
} catch (ExitUtil.ExitException e) {
|
||||||
fail();
|
fail();
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,7 +210,7 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
|
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
|
||||||
}
|
}
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
auxService.addApplication(
|
auxService.addApplicationIfAbsent(
|
||||||
appId, UserGroupInformation.getCurrentUser().getUserName());
|
appId, UserGroupInformation.getCurrentUser().getUserName());
|
||||||
if (!withKerberosLogin) {
|
if (!withKerberosLogin) {
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
|
|
|
@ -125,7 +125,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
||||||
* @param user Application Master container user.
|
* @param user Application Master container user.
|
||||||
* @return whether it was added successfully
|
* @return whether it was added successfully
|
||||||
*/
|
*/
|
||||||
public boolean addApplication(ApplicationId appId, String user) {
|
public boolean addApplicationIfAbsent(ApplicationId appId, String user) {
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
new AppLevelTimelineCollectorWithAgg(appId, user);
|
new AppLevelTimelineCollectorWithAgg(appId, user);
|
||||||
return (collectorManager.putIfAbsent(appId, collector)
|
return (collectorManager.putIfAbsent(appId, collector)
|
||||||
|
@ -156,15 +156,15 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
||||||
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
|
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
|
||||||
ApplicationId appId = context.getContainerId().
|
ApplicationId appId = context.getContainerId().
|
||||||
getApplicationAttemptId().getApplicationId();
|
getApplicationAttemptId().getApplicationId();
|
||||||
synchronized (appIdToContainerId) {
|
synchronized (appIdToContainerId){
|
||||||
Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
|
Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
|
||||||
if (masterContainers == null) {
|
if (masterContainers == null) {
|
||||||
masterContainers = new HashSet<>();
|
masterContainers = new HashSet<>();
|
||||||
appIdToContainerId.put(appId, masterContainers);
|
appIdToContainerId.put(appId, masterContainers);
|
||||||
}
|
}
|
||||||
masterContainers.add(context.getContainerId());
|
masterContainers.add(context.getContainerId());
|
||||||
addApplication(appId, context.getUser());
|
|
||||||
}
|
}
|
||||||
|
addApplicationIfAbsent(appId, context.getUser());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,6 +189,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
||||||
containerId.getApplicationAttemptId().getApplicationId();
|
containerId.getApplicationAttemptId().getApplicationId();
|
||||||
return scheduler.schedule(new Runnable() {
|
return scheduler.schedule(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
boolean shouldRemoveApplication = false;
|
||||||
synchronized (appIdToContainerId) {
|
synchronized (appIdToContainerId) {
|
||||||
Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
|
Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
|
||||||
if (masterContainers == null) {
|
if (masterContainers == null) {
|
||||||
|
@ -199,10 +200,14 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
||||||
masterContainers.remove(containerId);
|
masterContainers.remove(containerId);
|
||||||
if (masterContainers.size() == 0) {
|
if (masterContainers.size() == 0) {
|
||||||
// remove only if it is last master container
|
// remove only if it is last master container
|
||||||
removeApplication(appId);
|
shouldRemoveApplication = true;
|
||||||
appIdToContainerId.remove(appId);
|
appIdToContainerId.remove(appId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (shouldRemoveApplication) {
|
||||||
|
removeApplication(appId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, collectorLingerPeriod, TimeUnit.MILLISECONDS);
|
}, collectorLingerPeriod, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue