YARN-5101. YARN_APPLICATION_UPDATED event is parsed in ApplicationHistoryManagerOnTimelineStore#convertToApplicationReport with reversed order. Contributed by Sunil G.

This commit is contained in:
Rohith Sharma K S 2016-10-06 18:16:48 +05:30
parent 311954883f
commit 4d2f380d78
3 changed files with 21 additions and 9 deletions

View File

@ -351,6 +351,7 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
} }
} }
List<TimelineEvent> events = entity.getEvents(); List<TimelineEvent> events = entity.getEvents();
long updatedTimeStamp = 0L;
if (events != null) { if (events != null) {
for (TimelineEvent event : events) { for (TimelineEvent event : events) {
if (event.getEventType().equals( if (event.getEventType().equals(
@ -358,9 +359,16 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
createdTime = event.getTimestamp(); createdTime = event.getTimestamp();
} else if (event.getEventType().equals( } else if (event.getEventType().equals(
ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) { ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) {
// TODO: YARN-5101. This type of events are parsed in // This type of events are parsed in time-stamp descending order
// time-stamp descending order which means the previous event // which means the previous event could override the information
// could override the information from the later same type of event. // from the later same type of event. Hence compare timestamp
// before over writing.
if (event.getTimestamp() > updatedTimeStamp) {
updatedTimeStamp = event.getTimestamp();
} else {
continue;
}
Map<String, Object> eventInfo = event.getEventInfo(); Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) { if (eventInfo == null) {
continue; continue;

View File

@ -551,23 +551,27 @@ public class TestApplicationHistoryManagerOnTimelineStore {
entity.addEvent(tEvent); entity.addEvent(tEvent);
if (enableUpdateEvent) { if (enableUpdateEvent) {
tEvent = new TimelineEvent(); tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue", 5); long updatedTimeIndex = 4L;
createAppModifiedEvent(appId, tEvent, updatedTimeIndex++, "changed queue",
5);
entity.addEvent(tEvent); entity.addEvent(tEvent);
// Change priority alone // Change priority alone
tEvent = new TimelineEvent(); tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue", 6); createAppModifiedEvent(appId, tEvent, updatedTimeIndex++, "changed queue",
6);
// Now change queue // Now change queue
tEvent = new TimelineEvent(); tEvent = new TimelineEvent();
createAppModifiedEvent(appId, tEvent, "changed queue1", 6); createAppModifiedEvent(appId, tEvent, updatedTimeIndex++,
"changed queue1", 6);
entity.addEvent(tEvent); entity.addEvent(tEvent);
} }
return entity; return entity;
} }
private static void createAppModifiedEvent(ApplicationId appId, private static void createAppModifiedEvent(ApplicationId appId,
TimelineEvent tEvent, String queue, int priority) { TimelineEvent tEvent, long updatedTimeIndex, String queue, int priority) {
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 4L + appId.getId()); tEvent.setTimestamp(Integer.MAX_VALUE + updatedTimeIndex + appId.getId());
Map<String, Object> eventInfo = new HashMap<String, Object>(); Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, queue); eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, queue);
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,

View File

@ -1061,7 +1061,7 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
app.rmContext.getSystemMetricsPublisher().appUpdated(app, app.rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis()); app.systemClock.getTime());
// TODO: Write out change to state store (YARN-1558) // TODO: Write out change to state store (YARN-1558)
// Also take care of RM failover // Also take care of RM failover