YARN-6455. Enhance the timelinewriter.flush() race condition fix (Haibo Chen via Varun Saxena)

This commit is contained in:
Varun Saxena 2017-04-27 15:01:58 +05:30
parent 62579b69a0
commit 793bbf216d
2 changed files with 4 additions and 8 deletions

View File

@ -59,6 +59,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
private final TimelineCollectorContext context; private final TimelineCollectorContext context;
private ScheduledThreadPoolExecutor appAggregationExecutor; private ScheduledThreadPoolExecutor appAggregationExecutor;
private AppLevelAggregator appAggregator; private AppLevelAggregator appAggregator;
private UserGroupInformation currentUser;
public AppLevelTimelineCollector(ApplicationId appId) { public AppLevelTimelineCollector(ApplicationId appId) {
super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
@ -82,7 +83,8 @@ protected void serviceInit(Configuration conf) throws Exception {
// Set the default values, which will be updated with an RPC call to get the // Set the default values, which will be updated with an RPC call to get the
// context info from NM. // context info from NM.
// Current user usually is not the app user, but keep this field non-null // Current user usually is not the app user, but keep this field non-null
context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName()); currentUser = UserGroupInformation.getCurrentUser();
context.setUserId(currentUser.getShortUserName());
context.setAppId(appId.toString()); context.setAppId(appId.toString());
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -149,9 +151,7 @@ private void aggregate() {
TimelineEntityType.YARN_APPLICATION.toString()); TimelineEntityType.YARN_APPLICATION.toString());
TimelineEntities entities = new TimelineEntities(); TimelineEntities entities = new TimelineEntities();
entities.addEntity(resultEntity); entities.addEntity(resultEntity);
getWriter().write(currContext.getClusterId(), currContext.getUserId(), putEntitiesAsync(entities, currentUser);
currContext.getFlowName(), currContext.getFlowVersion(),
currContext.getFlowRunId(), currContext.getAppId(), entities);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error aggregating timeline metrics", e); LOG.error("Error aggregating timeline metrics", e);
} }

View File

@ -85,10 +85,6 @@ protected void setWriter(TimelineWriter w) {
this.writer = w; this.writer = w;
} }
protected TimelineWriter getWriter() {
return writer;
}
protected Map<String, AggregationStatusTable> getAggregationGroups() { protected Map<String, AggregationStatusTable> getAggregationGroups() {
return aggregationGroups; return aggregationGroups;
} }