diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 53fe055f75e..a1a31f9df34 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -78,6 +78,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.JsonNodeFactory; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; @@ -1124,7 +1127,7 @@ public class JobHistoryEventHandler extends AbstractService private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity createTaskEntity(HistoryEvent event, long timestamp, String taskId, String entityType, String relatedJobEntity, JobId jobId, - boolean setCreatedTime) { + boolean setCreatedTime, long taskIdPrefix) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = createBaseEntity(event, timestamp, entityType, setCreatedTime); entity.setId(taskId); @@ -1133,6 +1136,7 @@ public class JobHistoryEventHandler extends AbstractService ((TaskStartedEvent)event).getTaskType().toString()); } entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString()); + entity.setIdPrefix(taskIdPrefix); return entity; } @@ -1141,11 +1145,12 @@ public class JobHistoryEventHandler extends AbstractService private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity createTaskAttemptEntity(HistoryEvent event, long timestamp, String taskAttemptId, String entityType, String relatedTaskEntity, - String taskId, boolean setCreatedTime) { + String taskId, boolean setCreatedTime, long taskAttemptIdPrefix) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = createBaseEntity(event, timestamp, entityType, setCreatedTime); entity.setId(taskAttemptId); entity.addIsRelatedToEntity(relatedTaskEntity, taskId); + entity.setIdPrefix(taskAttemptIdPrefix); return entity; } @@ -1196,6 +1201,8 @@ public class JobHistoryEventHandler extends AbstractService String taskId = null; String taskAttemptId = null; boolean setCreatedTime = false; + long taskIdPrefix = 0; + long taskAttemptIdPrefix = 0; switch (event.getEventType()) { // Handle job events @@ -1218,15 +1225,21 @@ public class JobHistoryEventHandler extends AbstractService case TASK_STARTED: setCreatedTime = true; taskId = ((TaskStartedEvent)event).getTaskId().toString(); + taskIdPrefix = TimelineServiceHelper. + invertLong(((TaskStartedEvent)event).getStartTime()); break; case TASK_FAILED: taskId = ((TaskFailedEvent)event).getTaskId().toString(); + taskIdPrefix = TimelineServiceHelper. + invertLong(((TaskFailedEvent)event).getStartTime()); break; case TASK_UPDATED: taskId = ((TaskUpdatedEvent)event).getTaskId().toString(); break; case TASK_FINISHED: taskId = ((TaskFinishedEvent)event).getTaskId().toString(); + taskIdPrefix = TimelineServiceHelper. + invertLong(((TaskFinishedEvent)event).getStartTime()); break; case MAP_ATTEMPT_STARTED: case REDUCE_ATTEMPT_STARTED: @@ -1234,6 +1247,8 @@ public class JobHistoryEventHandler extends AbstractService taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString(); taskAttemptId = ((TaskAttemptStartedEvent)event). getTaskAttemptId().toString(); + taskAttemptIdPrefix = TimelineServiceHelper. + invertLong(((TaskAttemptStartedEvent)event).getStartTime()); break; case CLEANUP_ATTEMPT_STARTED: case SETUP_ATTEMPT_STARTED: @@ -1253,16 +1268,22 @@ public class JobHistoryEventHandler extends AbstractService getTaskId().toString(); taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event). getTaskAttemptId().toString(); + taskAttemptIdPrefix = TimelineServiceHelper.invertLong( + ((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime()); break; case MAP_ATTEMPT_FINISHED: taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString(); taskAttemptId = ((MapAttemptFinishedEvent)event). getAttemptId().toString(); + taskAttemptIdPrefix = TimelineServiceHelper. + invertLong(((MapAttemptFinishedEvent)event).getStartTime()); break; case REDUCE_ATTEMPT_FINISHED: taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString(); taskAttemptId = ((ReduceAttemptFinishedEvent)event). getAttemptId().toString(); + taskAttemptIdPrefix = TimelineServiceHelper. + invertLong(((ReduceAttemptFinishedEvent)event).getStartTime()); break; case SETUP_ATTEMPT_FINISHED: case CLEANUP_ATTEMPT_FINISHED: @@ -1291,12 +1312,12 @@ public class JobHistoryEventHandler extends AbstractService // TaskEntity tEntity = createTaskEntity(event, timestamp, taskId, MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, - jobId, setCreatedTime); + jobId, setCreatedTime, taskIdPrefix); } else { // TaskAttemptEntity tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId, MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE, - taskId, setCreatedTime); + taskId, setCreatedTime, taskAttemptIdPrefix); } } try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 9ea1b9aa922..3faad480b9d 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1530,7 +1530,7 @@ public abstract class TaskAttemptImpl implements StringUtils.join( LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt.getCounters(), taskAttempt - .getProgressSplitBlock().burst()); + .getProgressSplitBlock().burst(), taskAttempt.launchTime); return tauce; } @@ -1943,35 +1943,35 @@ public abstract class TaskAttemptImpl implements this.container == null ? -1 : this.container.getNodeId().getPort(); if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { MapAttemptFinishedEvent mfe = - new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), - TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), - state.toString(), - this.reportedStatus.mapFinishTime, - finishTime, - containerHostName, - containerNodePort, - this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, - this.reportedStatus.stateString, - getCounters(), - getProgressSplitBlock().burst()); - eventHandler.handle( - new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); + new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), + TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), + state.toString(), + this.reportedStatus.mapFinishTime, + finishTime, + containerHostName, + containerNodePort, + this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, + this.reportedStatus.stateString, + getCounters(), + getProgressSplitBlock().burst(), launchTime); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); } else { - ReduceAttemptFinishedEvent rfe = - new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), - TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), - state.toString(), - this.reportedStatus.shuffleFinishTime, - this.reportedStatus.sortFinishTime, - finishTime, - containerHostName, - containerNodePort, - this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, - this.reportedStatus.stateString, - getCounters(), - getProgressSplitBlock().burst()); - eventHandler.handle( - new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); + ReduceAttemptFinishedEvent rfe = + new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), + TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), + state.toString(), + this.reportedStatus.shuffleFinishTime, + this.reportedStatus.sortFinishTime, + finishTime, + containerHostName, + containerNodePort, + this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, + this.reportedStatus.stateString, + getCounters(), + getProgressSplitBlock().burst(), launchTime); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 8a6fa304d4e..228ae24a955 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -139,6 +139,8 @@ public abstract class TaskImpl implements Task, EventHandler { private final Set inProgressAttempts; private boolean historyTaskStartGenerated = false; + // Launch time reported in history events. + private long launchTime; private static final SingleArcTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); @@ -705,8 +707,9 @@ public abstract class TaskImpl implements Task, EventHandler { } private void sendTaskStartedEvent() { + launchTime = getLaunchTime(); TaskStartedEvent tse = new TaskStartedEvent( - TypeConverter.fromYarn(taskId), getLaunchTime(), + TypeConverter.fromYarn(taskId), launchTime, TypeConverter.fromYarn(taskId.getTaskType()), getSplitsAsString()); eventHandler @@ -714,18 +717,19 @@ public abstract class TaskImpl implements Task, EventHandler { historyTaskStartGenerated = true; } - private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) { + private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, + TaskStateInternal taskState) { TaskFinishedEvent tfe = new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), TypeConverter.fromYarn(task.successfulAttempt), task.getFinishTime(task.successfulAttempt), TypeConverter.fromYarn(task.taskId.getTaskType()), - taskState.toString(), - task.getCounters()); + taskState.toString(), task.getCounters(), task.launchTime); return tfe; } - private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List diag, TaskStateInternal taskState, TaskAttemptId taId) { + private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, + List diag, TaskStateInternal taskState, TaskAttemptId taId) { StringBuilder errorSb = new StringBuilder(); if (diag != null) { for (String d : diag) { @@ -740,7 +744,7 @@ public abstract class TaskImpl implements Task, EventHandler { errorSb.toString(), taskState.toString(), taId == null ? null : TypeConverter.fromYarn(taId), - task.getCounters()); + task.getCounters(), task.launchTime); return taskFailedEvent; } @@ -861,7 +865,8 @@ public abstract class TaskImpl implements Task, EventHandler { TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(), taskInfo.getFinishTime(), taskInfo.getTaskType(), taskInfo.getError(), taskInfo.getTaskStatus(), - taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters()); + taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters(), + launchTime); eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe)); eventHandler.handle( new JobTaskEvent(taskId, getExternalState(taskState))); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java index ac510b39434..e2713191ac7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java @@ -58,7 +58,7 @@ public class TestEvents { Counters counters = new Counters(); TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId, TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS", - counters); + counters, 234); assertEquals(test.getAttemptId().toString(), taskAttemptId.toString()); assertEquals(test.getCounters(), counters); @@ -69,7 +69,7 @@ public class TestEvents { assertEquals(test.getTaskId(), tid); assertEquals(test.getTaskStatus(), "TEST"); assertEquals(test.getTaskType(), TaskType.REDUCE); - + assertEquals(234, test.getStartTime()); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index caf8c6718a9..e35a84d537e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -148,7 +148,7 @@ public class TestJobHistoryEventHandler { // First completion event, but min-queue-size for batching flushes is 10 handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0))); verify(mockWriter).flush(); } finally { @@ -184,7 +184,7 @@ public class TestJobHistoryEventHandler { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0))); } handleNextNEvents(jheh, 9); @@ -229,7 +229,7 @@ public class TestJobHistoryEventHandler { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0))); } handleNextNEvents(jheh, 9); @@ -272,7 +272,7 @@ public class TestJobHistoryEventHandler { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0))); } queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters()))); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java index 3121c4e0016..2b1357ea859 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java @@ -32,9 +32,10 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** - * Event to record successful completion of a map attempt + * Event to record successful completion of a map attempt. * */ @InterfaceAudience.Private @@ -58,9 +59,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent { int[] cpuUsages; int[] vMemKbytes; int[] physMemKbytes; + private long startTime; /** - * Create an event for successful completion of map attempts + * Create an event for successful completion of map attempts. * @param id Task Attempt ID * @param taskType Type of the task * @param taskStatus Status of the task @@ -77,12 +79,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent { * virtual memory and physical memory. * * If you have no splits data, code {@code null} for this - * parameter. + * parameter. + * @param startTs Task start time to be used for writing entity to ATSv2. */ - public MapAttemptFinishedEvent - (TaskAttemptID id, TaskType taskType, String taskStatus, - long mapFinishTime, long finishTime, String hostname, int port, - String rackName, String state, Counters counters, int[][] allSplits) { + public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long mapFinishTime, long finishTime, String hostname, + int port, String rackName, String state, Counters counters, + int[][] allSplits, long startTs) { this.attemptId = id; this.taskType = taskType; this.taskStatus = taskStatus; @@ -98,6 +101,16 @@ public class MapAttemptFinishedEvent implements HistoryEvent { this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits); this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); + this.startTime = startTs; + } + + public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long mapFinishTime, long finishTime, String hostname, + int port, String rackName, String state, Counters counters, + int[][] allSplits) { + this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, port, + rackName, state, counters, allSplits, + SystemClock.getInstance().getTime()); } /** @@ -117,15 +130,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent { * @param counters Counters for the attempt */ @Deprecated - public MapAttemptFinishedEvent - (TaskAttemptID id, TaskType taskType, String taskStatus, - long mapFinishTime, long finishTime, String hostname, - String state, Counters counters) { + public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long mapFinishTime, long finishTime, String hostname, + String state, Counters counters) { this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, -1, "", state, counters, null); } - - + MapAttemptFinishedEvent() {} public Object getDatum() { @@ -175,38 +186,56 @@ public class MapAttemptFinishedEvent implements HistoryEvent { this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes()); } - /** Get the task ID */ - public TaskID getTaskId() { return attemptId.getTaskID(); } - /** Get the attempt id */ + /** Gets the task ID. */ + public TaskID getTaskId() { + return attemptId.getTaskID(); + } + /** Gets the attempt id. */ public TaskAttemptID getAttemptId() { return attemptId; } - /** Get the task type */ + /** Gets the task type. */ public TaskType getTaskType() { return taskType; } - /** Get the task status */ + /** Gets the task status. */ public String getTaskStatus() { return taskStatus.toString(); } - /** Get the map phase finish time */ + /** Gets the map phase finish time. */ public long getMapFinishTime() { return mapFinishTime; } - /** Get the attempt finish time */ + /** Gets the attempt finish time. */ public long getFinishTime() { return finishTime; } - /** Get the host name */ + /** + * Gets the task attempt start time. + * @return task attempt start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the host name. */ public String getHostname() { return hostname.toString(); } - /** Get the tracker rpc port */ + /** Gets the tracker rpc port. */ public int getPort() { return port; } - /** Get the rack name */ + /** Gets the rack name. */ public String getRackName() { return rackName == null ? null : rackName.toString(); } - - /** Get the state string */ - public String getState() { return state.toString(); } - /** Get the counters */ - Counters getCounters() { return counters; } - /** Get the event type */ + /** + * Gets the attempt state string. + * @return map attempt state + */ + public String getState() { + return state.toString(); + } + /** + * Gets the counters. + * @return counters + */ + Counters getCounters() { + return counters; + } + /** Gets the event type. */ public EventType getEventType() { return EventType.MAP_ATTEMPT_FINISHED; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java index 9c0f09b017d..5a16f834acb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record successful completion of a reduce attempt @@ -59,6 +60,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { int[] cpuUsages; int[] vMemKbytes; int[] physMemKbytes; + private long startTime; /** * Create an event to record completion of a reduce attempt @@ -76,13 +78,13 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { * @param allSplits the "splits", or a pixelated graph of various * measurable worker node state variables against progress. * Currently there are four; wallclock time, CPU time, - * virtual memory and physical memory. + * virtual memory and physical memory. + * @param startTs Task start time to be used for writing entity to ATSv2. */ - public ReduceAttemptFinishedEvent - (TaskAttemptID id, TaskType taskType, String taskStatus, - long shuffleFinishTime, long sortFinishTime, long finishTime, - String hostname, int port, String rackName, String state, - Counters counters, int[][] allSplits) { + public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long shuffleFinishTime, long sortFinishTime, + long finishTime, String hostname, int port, String rackName, + String state, Counters counters, int[][] allSplits, long startTs) { this.attemptId = id; this.taskType = taskType; this.taskStatus = taskStatus; @@ -99,6 +101,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits); this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); + this.startTime = startTs; + } + + public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long shuffleFinishTime, long sortFinishTime, + long finishTime, String hostname, int port, String rackName, + String state, Counters counters, int[][] allSplits) { + this(id, taskType, taskStatus, shuffleFinishTime, sortFinishTime, + finishTime, hostname, port, rackName, state, counters, allSplits, + SystemClock.getInstance().getTime()); } /** @@ -118,13 +130,12 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { * @param state State of the attempt * @param counters Counters for the attempt */ - public ReduceAttemptFinishedEvent - (TaskAttemptID id, TaskType taskType, String taskStatus, - long shuffleFinishTime, long sortFinishTime, long finishTime, - String hostname, String state, Counters counters) { + public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long shuffleFinishTime, long sortFinishTime, + long finishTime, String hostname, String state, Counters counters) { this(id, taskType, taskStatus, - shuffleFinishTime, sortFinishTime, finishTime, - hostname, -1, "", state, counters, null); + shuffleFinishTime, sortFinishTime, finishTime, + hostname, -1, "", state, counters, null); } ReduceAttemptFinishedEvent() {} @@ -178,39 +189,55 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes()); } - /** Get the Task ID */ + /** Gets the Task ID. */ public TaskID getTaskId() { return attemptId.getTaskID(); } - /** Get the attempt id */ + /** Gets the attempt id. */ public TaskAttemptID getAttemptId() { return attemptId; } - /** Get the task type */ + /** Gets the task type. */ public TaskType getTaskType() { return taskType; } - /** Get the task status */ + /** Gets the task status. */ public String getTaskStatus() { return taskStatus.toString(); } - /** Get the finish time of the sort phase */ + /** Gets the finish time of the sort phase. */ public long getSortFinishTime() { return sortFinishTime; } - /** Get the finish time of the shuffle phase */ + /** Gets the finish time of the shuffle phase. */ public long getShuffleFinishTime() { return shuffleFinishTime; } - /** Get the finish time of the attempt */ + /** Gets the finish time of the attempt. */ public long getFinishTime() { return finishTime; } - /** Get the name of the host where the attempt ran */ + /** + * Gets the start time. + * @return task attempt start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the name of the host where the attempt ran. */ public String getHostname() { return hostname.toString(); } - /** Get the tracker rpc port */ + /** Gets the tracker rpc port. */ public int getPort() { return port; } - /** Get the rack name of the node where the attempt ran */ + /** Gets the rack name of the node where the attempt ran. */ public String getRackName() { return rackName == null ? null : rackName.toString(); } - - /** Get the state string */ - public String getState() { return state.toString(); } - /** Get the counters for the attempt */ - Counters getCounters() { return counters; } - /** Get the event type */ + /** + * Gets the state string. + * @return reduce attempt state + */ + public String getState() { + return state.toString(); + } + /** + * Gets the counters. + * @return counters + */ + Counters getCounters() { + return counters; + } + /** Gets the event type. */ public EventType getEventType() { return EventType.REDUCE_ATTEMPT_FINISHED; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java index a931ca24033..c28c21605df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record successful task completion @@ -50,10 +51,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { private String hostname; private String state; private Counters counters; + private long startTime; /** - * Create an event to record successful finishes for setup and cleanup - * attempts + * Create an event to record successful finishes for setup and cleanup + * attempts. * @param id Attempt ID * @param taskType Type of task * @param taskStatus Status of task @@ -61,11 +63,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { * @param hostname Host where the attempt executed * @param state State string * @param counters Counters for the attempt + * @param startTs Task start time to be used for writing entity to ATSv2. */ public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, String taskStatus, long finishTime, String rackName, - String hostname, String state, Counters counters) { + String hostname, String state, Counters counters, long startTs) { this.attemptId = id; this.taskType = taskType; this.taskStatus = taskStatus; @@ -74,6 +77,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.hostname = hostname; this.state = state; this.counters = counters; + this.startTime = startTs; + } + + public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long finishTime, String rackName, String hostname, + String state, Counters counters) { + this(id, taskType, taskStatus, finishTime, rackName, hostname, state, + counters, SystemClock.getInstance().getTime()); } TaskAttemptFinishedEvent() {} @@ -107,33 +118,43 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.counters = EventReader.fromAvro(datum.getCounters()); } - /** Get the task ID */ + /** Gets the task ID. */ public TaskID getTaskId() { return attemptId.getTaskID(); } - /** Get the task attempt id */ + /** Gets the task attempt id. */ public TaskAttemptID getAttemptId() { return attemptId; } - /** Get the task type */ + /** Gets the task type. */ public TaskType getTaskType() { return taskType; } - /** Get the task status */ + /** Gets the task status. */ public String getTaskStatus() { return taskStatus.toString(); } - /** Get the attempt finish time */ + /** Gets the attempt finish time. */ public long getFinishTime() { return finishTime; } - /** Get the host where the attempt executed */ + /** + * Gets the task attempt start time to be used while publishing to ATSv2. + * @return task attempt start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the host where the attempt executed. */ public String getHostname() { return hostname.toString(); } - /** Get the rackname where the attempt executed */ + /** Gets the rackname where the attempt executed. */ public String getRackName() { return rackName == null ? null : rackName.toString(); } - /** Get the state string */ + /** + * Gets the state string. + * @return task attempt state. + */ public String getState() { return state.toString(); } - /** Get the counters for the attempt */ + /** Gets the counters for the attempt. */ Counters getCounters() { return counters; } - /** Get the event type */ + /** Gets the event type. */ public EventType getEventType() { // Note that the task type can be setup/map/reduce/cleanup but the // attempt-type can only be map/reduce. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java index 175296725ff..1529125c2f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record unsuccessful (Killed/Failed) completion of task attempts @@ -58,10 +59,11 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { int[] cpuUsages; int[] vMemKbytes; int[] physMemKbytes; + private long startTime; private static final Counters EMPTY_COUNTERS = new Counters(); - /** - * Create an event to record the unsuccessful completion of attempts + /** + * Create an event to record the unsuccessful completion of attempts. * @param id Attempt ID * @param taskType Type of the task * @param status Status of the attempt @@ -75,12 +77,13 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { * measurable worker node state variables against progress. * Currently there are four; wallclock time, CPU time, * virtual memory and physical memory. + * @param startTs Task start time to be used for writing entity to ATSv2. */ public TaskAttemptUnsuccessfulCompletionEvent (TaskAttemptID id, TaskType taskType, String status, long finishTime, String hostname, int port, String rackName, - String error, Counters counters, int[][] allSplits) { + String error, Counters counters, int[][] allSplits, long startTs) { this.attemptId = id; this.taskType = taskType; this.status = status; @@ -99,6 +102,15 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); + this.startTime = startTs; + } + + public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id, + TaskType taskType, String status, long finishTime, String hostname, + int port, String rackName, String error, Counters counters, + int[][] allSplits) { + this(id, taskType, status, finishTime, hostname, port, rackName, error, + counters, allSplits, SystemClock.getInstance().getTime()); } /** @@ -190,39 +202,49 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { AvroArrayUtils.fromAvro(datum.getPhysMemKbytes()); } - /** Get the task id */ + /** Gets the task id. */ public TaskID getTaskId() { return attemptId.getTaskID(); } - /** Get the task type */ + /** Gets the task type. */ public TaskType getTaskType() { return TaskType.valueOf(taskType.toString()); } - /** Get the attempt id */ + /** Gets the attempt id. */ public TaskAttemptID getTaskAttemptId() { return attemptId; } - /** Get the finish time */ + /** Gets the finish time. */ public long getFinishTime() { return finishTime; } - /** Get the name of the host where the attempt executed */ + /** + * Gets the task attempt start time to be used while publishing to ATSv2. + * @return task attempt start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the name of the host where the attempt executed. */ public String getHostname() { return hostname; } - /** Get the rpc port for the host where the attempt executed */ + /** Gets the rpc port for the host where the attempt executed. */ public int getPort() { return port; } - - /** Get the rack name of the node where the attempt ran */ + + /** Gets the rack name of the node where the attempt ran. */ public String getRackName() { return rackName == null ? null : rackName.toString(); } - - /** Get the error string */ + + /** Gets the error string. */ public String getError() { return error.toString(); } - /** Get the task status */ + /** + * Gets the task attempt status. + * @return task attempt status. + */ public String getTaskStatus() { return status.toString(); } - /** Get the counters */ + /** Gets the counters. */ Counters getCounters() { return counters; } - /** Get the event type */ + /** Gets the event type. */ public EventType getEventType() { // Note that the task type can be setup/map/reduce/cleanup but the // attempt-type can only be map/reduce. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java index d14350df54f..b4d9e410da2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record the failure of a task @@ -49,11 +50,12 @@ public class TaskFailedEvent implements HistoryEvent { private String status; private String error; private Counters counters; + private long startTime; private static final Counters EMPTY_COUNTERS = new Counters(); /** - * Create an event to record task failure + * Create an event to record task failure. * @param id Task ID * @param finishTime Finish time of the task * @param taskType Type of the task @@ -61,10 +63,11 @@ public class TaskFailedEvent implements HistoryEvent { * @param status Status * @param failedDueToAttempt The attempt id due to which the task failed * @param counters Counters for the task + * @param startTs task start time. */ public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType, String error, String status, - TaskAttemptID failedDueToAttempt, Counters counters) { + TaskAttemptID failedDueToAttempt, Counters counters, long startTs) { this.id = id; this.finishTime = finishTime; this.taskType = taskType; @@ -72,15 +75,23 @@ public class TaskFailedEvent implements HistoryEvent { this.status = status; this.failedDueToAttempt = failedDueToAttempt; this.counters = counters; + this.startTime = startTs; + } + + public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType, + String error, String status, TaskAttemptID failedDueToAttempt, + Counters counters) { + this(id, finishTime, taskType, error, status, failedDueToAttempt, counters, + SystemClock.getInstance().getTime()); } public TaskFailedEvent(TaskID id, long finishTime, - TaskType taskType, String error, String status, - TaskAttemptID failedDueToAttempt) { - this(id, finishTime, taskType, error, status, - failedDueToAttempt, EMPTY_COUNTERS); + TaskType taskType, String error, String status, + TaskAttemptID failedDueToAttempt) { + this(id, finishTime, taskType, error, status, failedDueToAttempt, + EMPTY_COUNTERS); } - + TaskFailedEvent() {} public Object getDatum() { @@ -118,27 +129,37 @@ public class TaskFailedEvent implements HistoryEvent { EventReader.fromAvro(datum.getCounters()); } - /** Get the task id */ + /** Gets the task id. */ public TaskID getTaskId() { return id; } - /** Get the error string */ + /** Gets the error string. */ public String getError() { return error; } - /** Get the finish time of the attempt */ + /** Gets the finish time of the attempt. */ public long getFinishTime() { return finishTime; } - /** Get the task type */ + /** + * Gets the task start time to be reported to ATSv2. + * @return task start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the task type. */ public TaskType getTaskType() { return taskType; } - /** Get the attempt id due to which the task failed */ + /** Gets the attempt id due to which the task failed. */ public TaskAttemptID getFailedAttemptID() { return failedDueToAttempt; } - /** Get the task status */ + /** + * Gets the task status. + * @return task status + */ public String getTaskStatus() { return status; } - /** Get task counters */ + /** Gets task counters. */ public Counters getCounters() { return counters; } - /** Get the event type */ + /** Gets the event type. */ public EventType getEventType() { return EventType.TASK_FAILED; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java index 0bc43832636..97557c7e0b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record the successful completion of a task @@ -49,27 +50,36 @@ public class TaskFinishedEvent implements HistoryEvent { private TaskType taskType; private String status; private Counters counters; - + private long startTime; + /** - * Create an event to record the successful completion of a task + * Create an event to record the successful completion of a task. * @param id Task ID * @param attemptId Task Attempt ID of the successful attempt for this task * @param finishTime Finish time of the task * @param taskType Type of the task * @param status Status string * @param counters Counters for the task + * @param startTs task start time */ public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime, TaskType taskType, - String status, Counters counters) { + String status, Counters counters, long startTs) { this.taskid = id; this.successfulAttemptId = attemptId; this.finishTime = finishTime; this.taskType = taskType; this.status = status; this.counters = counters; + this.startTime = startTs; } - + + public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime, + TaskType taskType, String status, Counters counters) { + this(id, attemptId, finishTime, taskType, status, counters, + SystemClock.getInstance().getTime()); + } + TaskFinishedEvent() {} public Object getDatum() { @@ -101,23 +111,33 @@ public class TaskFinishedEvent implements HistoryEvent { this.counters = EventReader.fromAvro(datum.getCounters()); } - /** Get task id */ + /** Gets task id. */ public TaskID getTaskId() { return taskid; } - /** Get successful task attempt id */ + /** Gets successful task attempt id. */ public TaskAttemptID getSuccessfulTaskAttemptId() { return successfulAttemptId; } - /** Get the task finish time */ + /** Gets the task finish time. */ public long getFinishTime() { return finishTime; } - /** Get task counters */ + /** + * Gets the task start time to be reported to ATSv2. + * @return task start time + */ + public long getStartTime() { + return startTime; + } + /** Gets task counters. */ public Counters getCounters() { return counters; } - /** Get task type */ + /** Gets task type. */ public TaskType getTaskType() { return taskType; } - /** Get task status */ + /** + * Gets task status. + * @return task status + */ public String getTaskStatus() { return status.toString(); } - /** Get event type */ + /** Gets event type. */ public EventType getEventType() { return EventType.TASK_FINISHED; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index cbca3c8cdb1..9434d460cd4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -298,10 +298,10 @@ public class TestMRTimelineEventHandling { " does not exist.", jobEventFile.exists()); verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(), - true, false, null); + true, false, null, false); Set cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2", "huge_dummy_conf1", "huge_dummy_conf2"); - verifyEntity(jobEventFile, null, false, true, cfgsToCheck); + verifyEntity(jobEventFile, null, false, true, cfgsToCheck, false); // for this test, we expect MR job metrics are published in YARN_APPLICATION String outputAppDir = @@ -322,8 +322,8 @@ public class TestMRTimelineEventHandling { "appEventFilePath: " + appEventFilePath + " does not exist.", appEventFile.exists()); - verifyEntity(appEventFile, null, true, false, null); - verifyEntity(appEventFile, null, false, true, cfgsToCheck); + verifyEntity(appEventFile, null, true, false, null, false); + verifyEntity(appEventFile, null, false, true, cfgsToCheck, false); // check for task event file String outputDirTask = @@ -344,7 +344,7 @@ public class TestMRTimelineEventHandling { " does not exist.", taskEventFile.exists()); verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(), - true, false, null); + true, false, null, true); // check for task attempt event file String outputDirTaskAttempt = @@ -363,7 +363,7 @@ public class TestMRTimelineEventHandling { Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath + " does not exist.", taskAttemptEventFile.exists()); verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(), - true, false, null); + true, false, null, true); } /** @@ -380,12 +380,13 @@ public class TestMRTimelineEventHandling { * @throws IOException */ private void verifyEntity(File entityFile, String eventId, - boolean chkMetrics, boolean chkCfg, Set cfgsToVerify) - throws IOException { + boolean chkMetrics, boolean chkCfg, Set cfgsToVerify, + boolean checkIdPrefix) throws IOException { BufferedReader reader = null; String strLine; try { reader = new BufferedReader(new FileReader(entityFile)); + long idPrefix = -1; while ((strLine = reader.readLine()) != null) { if (strLine.trim().length() > 0) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity @@ -394,6 +395,19 @@ public class TestMRTimelineEventHandling { strLine.trim(), org.apache.hadoop.yarn.api.records.timelineservice. TimelineEntity.class); + + LOG.info("strLine.trim()= " + strLine.trim()); + if (checkIdPrefix) { + Assert.assertTrue("Entity ID prefix expected to be > 0" , + entity.getIdPrefix() > 0); + if (idPrefix == -1) { + idPrefix = entity.getIdPrefix(); + } else { + Assert.assertEquals("Entity ID prefix should be same across " + + "each publish of same entity", + idPrefix, entity.getIdPrefix()); + } + } if (eventId == null) { // Job metrics are published without any events for // ApplicationEntity. There is also possibility that some other diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index ab4607aca6a..a02af709116 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -104,6 +104,8 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.LogManager; @@ -313,6 +315,17 @@ public class ApplicationMaster { protected final Set launchedContainers = Collections.newSetFromMap(new ConcurrentHashMap()); + /** + * Container start times used to set id prefix while publishing entity + * to ATSv2. + */ + private final ConcurrentMap containerStartTimes = + new ConcurrentHashMap(); + + private ConcurrentMap getContainerStartTimes() { + return containerStartTimes; + } + /** * @param args Command line args */ @@ -866,7 +879,15 @@ public class ApplicationMaster { + containerStatus.getContainerId()); } if (timelineServiceV2Enabled) { - publishContainerEndEventOnTimelineServiceV2(containerStatus); + Long containerStartTime = + containerStartTimes.get(containerStatus.getContainerId()); + if (containerStartTime == null) { + containerStartTime = SystemClock.getInstance().getTime(); + containerStartTimes.put(containerStatus.getContainerId(), + containerStartTime); + } + publishContainerEndEventOnTimelineServiceV2(containerStatus, + containerStartTime); } else if (timelineServiceV1Enabled) { publishContainerEndEvent(timelineClient, containerStatus, domainId, appSubmitterUgi); @@ -994,8 +1015,10 @@ public class ApplicationMaster { containerId, container.getNodeId()); } if (applicationMaster.timelineServiceV2Enabled) { - applicationMaster - .publishContainerStartEventOnTimelineServiceV2(container); + long startTime = SystemClock.getInstance().getTime(); + applicationMaster.getContainerStartTimes().put(containerId, startTime); + applicationMaster.publishContainerStartEventOnTimelineServiceV2( + container, startTime); } else if (applicationMaster.timelineServiceV1Enabled) { applicationMaster.publishContainerStartEvent( applicationMaster.timelineClient, container, @@ -1356,24 +1379,24 @@ public class ApplicationMaster { } private void publishContainerStartEventOnTimelineServiceV2( - Container container) { + Container container, long startTime) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice. TimelineEntity(); entity.setId(container.getId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); - long ts = System.currentTimeMillis(); - entity.setCreatedTime(ts); + entity.setCreatedTime(startTime); entity.addInfo("user", appSubmitterUgi.getShortUserName()); org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); - event.setTimestamp(ts); + event.setTimestamp(startTime); event.setId(DSEvent.DS_CONTAINER_START.toString()); event.addInfo("Node", container.getNodeId().toString()); event.addInfo("Resources", container.getResource().toString()); entity.addEvent(event); + entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime)); try { appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @@ -1391,7 +1414,7 @@ public class ApplicationMaster { } private void publishContainerEndEventOnTimelineServiceV2( - final ContainerStatus container) { + final ContainerStatus container, long containerStartTime) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice. @@ -1407,6 +1430,7 @@ public class ApplicationMaster { event.addInfo("State", container.getState().name()); event.addInfo("Exit Status", container.getExitStatus()); entity.addEvent(event); + entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); try { appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @@ -1441,6 +1465,8 @@ public class ApplicationMaster { event.setId(appEvent.toString()); event.setTimestamp(ts); entity.addEvent(event); + entity.setIdPrefix( + TimelineServiceHelper.invertLong(appAttemptID.getAttemptId())); try { appSubmitterUgi.doAs(new PrivilegedExceptionAction() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index ef21c8786c5..47485aefc77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -64,7 +64,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter; import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient; @@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.TimelineVersion; import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; @@ -523,15 +526,31 @@ public class TestDistributedShell { "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_000001" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT", - appTimestampFileName); + File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath, + "DS_APP_ATTEMPT", appTimestampFileName); + // Check if required events are published and same idprefix is sent for + // on each publish. + verifyEntityForTimelineV2(dsAppAttemptEntityFile, + DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true); + // to avoid race condition of testcase, atleast check 40 times with sleep + // of 50ms + verifyEntityForTimelineV2(dsAppAttemptEntityFile, + DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true); - // Verify DS_CONTAINER entities posted by the client + // Verify DS_CONTAINER entities posted by the client. String containerTimestampFileName = "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_01_000002.thist"; - verifyEntityTypeFileExists(basePath, "DS_CONTAINER", - containerTimestampFileName); + File dsContainerEntityFile = verifyEntityTypeFileExists(basePath, + "DS_CONTAINER", containerTimestampFileName); + // Check if required events are published and same idprefix is sent for + // on each publish. + verifyEntityForTimelineV2(dsContainerEntityFile, + DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true); + // to avoid race condition of testcase, atleast check 40 times with sleep + // of 50ms + verifyEntityForTimelineV2(dsContainerEntityFile, + DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true); // Verify NM posting container metrics info. String containerMetricsTimestampFileName = @@ -541,29 +560,13 @@ public class TestDistributedShell { File containerEntityFile = verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_CONTAINER.toString(), containerMetricsTimestampFileName); - Assert.assertEquals( - "Container created event needs to be published atleast once", - 1, - getNumOfStringOccurrences(containerEntityFile, - ContainerMetricsConstants.CREATED_EVENT_TYPE)); + verifyEntityForTimelineV2(containerEntityFile, + ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true); - // to avoid race condition of testcase, atleast check 4 times with sleep - // of 500ms - long numOfContainerFinishedOccurrences = 0; - for (int i = 0; i < 4; i++) { - numOfContainerFinishedOccurrences = - getNumOfStringOccurrences(containerEntityFile, - ContainerMetricsConstants.FINISHED_EVENT_TYPE); - if (numOfContainerFinishedOccurrences > 0) { - break; - } else { - Thread.sleep(500L); - } - } - Assert.assertEquals( - "Container finished event needs to be published atleast once", - 1, - numOfContainerFinishedOccurrences); + // to avoid race condition of testcase, atleast check 40 times with sleep + // of 50ms + verifyEntityForTimelineV2(containerEntityFile, + ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true); // Verify RM posting Application life cycle Events are getting published String appMetricsTimestampFileName = @@ -573,29 +576,14 @@ public class TestDistributedShell { verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_APPLICATION.toString(), appMetricsTimestampFileName); - Assert.assertEquals( - "Application created event should be published atleast once", - 1, - getNumOfStringOccurrences(appEntityFile, - ApplicationMetricsConstants.CREATED_EVENT_TYPE)); + // No need to check idprefix for app. + verifyEntityForTimelineV2(appEntityFile, + ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false); - // to avoid race condition of testcase, atleast check 4 times with sleep - // of 500ms - long numOfStringOccurrences = 0; - for (int i = 0; i < 4; i++) { - numOfStringOccurrences = - getNumOfStringOccurrences(appEntityFile, - ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - if (numOfStringOccurrences > 0) { - break; - } else { - Thread.sleep(500L); - } - } - Assert.assertEquals( - "Application finished event should be published atleast once", - 1, - numOfStringOccurrences); + // to avoid race condition of testcase, atleast check 40 times with sleep + // of 50ms + verifyEntityForTimelineV2(appEntityFile, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false); // Verify RM posting AppAttempt life cycle Events are getting published String appAttemptMetricsTimestampFileName = @@ -606,17 +594,10 @@ public class TestDistributedShell { verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptMetricsTimestampFileName); - Assert.assertEquals( - "AppAttempt register event should be published atleast once", - 1, - getNumOfStringOccurrences(appAttemptEntityFile, - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)); - - Assert.assertEquals( - "AppAttempt finished event should be published atleast once", - 1, - getNumOfStringOccurrences(appAttemptEntityFile, - AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)); + verifyEntityForTimelineV2(appAttemptEntityFile, + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true); + verifyEntityForTimelineV2(appAttemptEntityFile, + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true); } finally { FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); } @@ -636,22 +617,64 @@ public class TestDistributedShell { return entityFile; } - private long getNumOfStringOccurrences(File entityFile, String searchString) - throws IOException { - BufferedReader reader = null; - String strLine; + /** + * Checks the events and idprefix published for an entity. + * + * @param entityFile Entity file. + * @param expectedEvent Expected event Id. + * @param numOfExpectedEvent Number of expected occurences of expected event + * id. + * @param checkTimes Number of times to check. + * @param sleepTime Sleep time for each iteration. + * @param checkIdPrefix Whether to check idprefix. + * @throws IOException if entity file reading fails. + * @throws InterruptedException if sleep is interrupted. + */ + private void verifyEntityForTimelineV2(File entityFile, String expectedEvent, + long numOfExpectedEvent, int checkTimes, long sleepTime, + boolean checkIdPrefix) throws IOException, InterruptedException { long actualCount = 0; - try { - reader = new BufferedReader(new FileReader(entityFile)); - while ((strLine = reader.readLine()) != null) { - if (strLine.trim().contains(searchString)) { - actualCount++; + for (int i = 0; i < checkTimes; i++) { + BufferedReader reader = null; + String strLine = null; + actualCount = 0; + try { + reader = new BufferedReader(new FileReader(entityFile)); + long idPrefix = -1; + while ((strLine = reader.readLine()) != null) { + String entityLine = strLine.trim(); + if (entityLine.isEmpty()) { + continue; + } + if (entityLine.contains(expectedEvent)) { + actualCount++; + } + if (checkIdPrefix) { + TimelineEntity entity = FileSystemTimelineReaderImpl. + getTimelineRecordFromJSON(entityLine, TimelineEntity.class); + Assert.assertTrue("Entity ID prefix expected to be > 0" , + entity.getIdPrefix() > 0); + if (idPrefix == -1) { + idPrefix = entity.getIdPrefix(); + } else { + Assert.assertEquals("Entity ID prefix should be same across " + + "each publish of same entity", + idPrefix, entity.getIdPrefix()); + } + } } + } finally { + reader.close(); + } + if (numOfExpectedEvent == actualCount) { + break; + } + if (sleepTime > 0 && i < checkTimes - 1) { + Thread.sleep(sleepTime); } - } finally { - reader.close(); } - return actualCount; + Assert.assertEquals("Unexpected number of " + expectedEvent + + " event published.", numOfExpectedEvent, actualCount); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index ef36ba64408..c7880d53b0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -155,6 +155,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -1052,10 +1053,11 @@ public class ContainerManagerImpl extends CompositeService implements Credentials credentials = YarnServerSecurityUtils.parseCredentials(launchContext); + long containerStartTime = SystemClock.getInstance().getTime(); Container container = new ContainerImpl(getConfig(), this.dispatcher, launchContext, credentials, metrics, containerTokenIdentifier, - context); + context, containerStartTime); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); if (context.getContainers().putIfAbsent(containerId, container) != null) { @@ -1112,7 +1114,7 @@ public class ContainerManagerImpl extends CompositeService implements } this.context.getNMStateStore().storeContainer(containerId, - containerTokenIdentifier.getVersion(), request); + containerTokenIdentifier.getVersion(), containerStartTime, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java index 0a8ffdff659..09c946b6829 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java @@ -23,12 +23,16 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; public class ApplicationContainerFinishedEvent extends ApplicationEvent { private ContainerStatus containerStatus; + // Required by NMTimelinePublisher. + private long containerStartTime; - public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) { + public ApplicationContainerFinishedEvent(ContainerStatus containerStatus, + long containerStartTs) { super(containerStatus.getContainerId().getApplicationAttemptId(). getApplicationId(), ApplicationEventType.APPLICATION_CONTAINER_FINISHED); this.containerStatus = containerStatus; + this.containerStartTime = containerStartTs; } public ContainerId getContainerID() { @@ -39,4 +43,7 @@ public class ApplicationContainerFinishedEvent extends ApplicationEvent { return containerStatus; } + public long getContainerStartTime() { + return containerStartTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index f6e567c19ec..ac9fbb7070c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -37,6 +37,8 @@ public interface Container extends EventHandler { ContainerId getContainerId(); + long getContainerStartTime(); + Resource getResource(); ContainerTokenIdentifier getContainerTokenIdentifier(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 6af8653aae1..772b6e7660f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -173,11 +173,11 @@ public class ContainerImpl implements Container { /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; + private final long startTime; private static final Logger LOG = LoggerFactory.getLogger(ContainerImpl.class); - // whether container has been recovered after a restart private RecoveredContainerStatus recoveredStatus = RecoveredContainerStatus.REQUESTED; @@ -190,6 +190,16 @@ public class ContainerImpl implements Container { ContainerLaunchContext launchContext, Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, Context context) { + this(conf, dispatcher, launchContext, creds, metrics, + containerTokenIdentifier, context, SystemClock.getInstance().getTime()); + } + + public ContainerImpl(Configuration conf, Dispatcher dispatcher, + ContainerLaunchContext launchContext, Credentials creds, + NodeManagerMetrics metrics, + ContainerTokenIdentifier containerTokenIdentifier, Context context, + long startTs) { + this.startTime = startTs; this.daemonConf = conf; this.dispatcher = dispatcher; this.stateStore = context.getNMStateStore(); @@ -263,7 +273,7 @@ public class ContainerImpl implements Container { ContainerTokenIdentifier containerTokenIdentifier, Context context, RecoveredContainerState rcs) { this(conf, dispatcher, launchContext, creds, metrics, - containerTokenIdentifier, context); + containerTokenIdentifier, context, rcs.getStartTime()); this.recoveredStatus = rcs.getStatus(); this.exitCode = rcs.getExitCode(); this.recoveredAsKilled = rcs.getKilled(); @@ -630,6 +640,11 @@ public class ContainerImpl implements Container { return this.containerId; } + @Override + public long getContainerStartTime() { + return this.startTime; + } + @Override public Resource getResource() { return Resources.clone( @@ -694,7 +709,8 @@ public class ContainerImpl implements Container { EventHandler eventHandler = dispatcher.getEventHandler(); ContainerStatus containerStatus = cloneAndGetContainerStatus(); - eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus)); + eventHandler.handle( + new ApplicationContainerFinishedEvent(containerStatus, startTime)); // Tell the scheduler the container is Done eventHandler.handle(new ContainerSchedulerEvent(this, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index a2ee38d4a23..a31756e1c95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -112,6 +112,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { "ContainerManager/containers/"; private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; private static final String CONTAINER_VERSION_KEY_SUFFIX = "/version"; + private static final String CONTAINER_START_TIME_KEY_SUFFIX = "/starttime"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued"; @@ -257,6 +258,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { StartContainerRequestProto.parseFrom(entry.getValue())); } else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) { rcs.version = Integer.parseInt(asString(entry.getValue())); + } else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) { + rcs.setStartTime(Long.parseLong(asString(entry.getValue()))); } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { rcs.diagnostics = asString(entry.getValue()); } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) { @@ -296,21 +299,23 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { @Override public void storeContainer(ContainerId containerId, int containerVersion, - StartContainerRequest startRequest) throws IOException { + long startTime, StartContainerRequest startRequest) throws IOException { String idStr = containerId.toString(); if (LOG.isDebugEnabled()) { LOG.debug("storeContainer: containerId= " + idStr + ", startRequest= " + startRequest); } - String keyRequest = CONTAINERS_KEY_PREFIX + idStr - + CONTAINER_REQUEST_KEY_SUFFIX; + String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX); String keyVersion = getContainerVersionKey(idStr); + String keyStartTime = + getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX); try { WriteBatch batch = db.createWriteBatch(); try { batch.put(bytes(keyRequest), - ((StartContainerRequestPBImpl) startRequest) - .getProto().toByteArray()); + ((StartContainerRequestPBImpl) startRequest).getProto(). + toByteArray()); + batch.put(bytes(keyStartTime), bytes(Long.toString(startTime))); if (containerVersion != 0) { batch.put(bytes(keyVersion), bytes(Integer.toString(containerVersion))); @@ -326,7 +331,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { @VisibleForTesting String getContainerVersionKey(String containerId) { - return CONTAINERS_KEY_PREFIX + containerId + CONTAINER_VERSION_KEY_SUFFIX; + return getContainerKey(containerId, CONTAINER_VERSION_KEY_SUFFIX); + } + + private String getContainerKey(String containerId, String suffix) { + return CONTAINERS_KEY_PREFIX + containerId + suffix; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 96c3f9e5c6f..86dc99fdeaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -71,7 +71,7 @@ public class NMNullStateStoreService extends NMStateStoreService { @Override public void storeContainer(ContainerId containerId, int version, - StartContainerRequest startRequest) throws IOException { + long startTime, StartContainerRequest startRequest) throws IOException { } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 9f87279fb25..ec534bff7ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -87,6 +87,7 @@ public abstract class NMStateStoreService extends AbstractService { int version; private RecoveredContainerType recoveryType = RecoveredContainerType.RECOVER; + private long startTime; public RecoveredContainerStatus getStatus() { return status; @@ -108,6 +109,14 @@ public abstract class NMStateStoreService extends AbstractService { return version; } + public long getStartTime() { + return startTime; + } + + public void setStartTime(long ts) { + startTime = ts; + } + public StartContainerRequest getStartRequest() { return startRequest; } @@ -145,6 +154,7 @@ public abstract class NMStateStoreService extends AbstractService { return new StringBuffer("Status: ").append(getStatus()) .append(", Exit code: ").append(exitCode) .append(", Version: ").append(version) + .append(", Start Time: ").append(startTime) .append(", Killed: ").append(getKilled()) .append(", Diagnostics: ").append(getDiagnostics()) .append(", Capability: ").append(getCapability()) @@ -365,11 +375,12 @@ public abstract class NMStateStoreService extends AbstractService { * Record a container start request * @param containerId the container ID * @param containerVersion the container Version + * @param startTime container start time * @param startRequest the container start request * @throws IOException */ public abstract void storeContainer(ContainerId containerId, - int containerVersion, StartContainerRequest startRequest) + int containerVersion, long startTime, StartContainerRequest startRequest) throws IOException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 23a7e4ff958..c2ac5dc9e74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -149,6 +150,8 @@ public class NMTimelinePublisher extends CompositeService { Math.round(cpuUsagePercentPerCore)); entity.addMetric(cpuMetric); } + entity.setIdPrefix(TimelineServiceHelper. + invertLong(container.getContainerStartTime())); ApplicationId appId = container.getContainerId().getApplicationAttemptId() .getApplicationId(); try { @@ -195,15 +198,17 @@ public class NMTimelinePublisher extends CompositeService { tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); + long containerStartTime = container.getContainerStartTime(); entity.addEvent(tEvent); - entity.setCreatedTime(event.getTimestamp()); + entity.setCreatedTime(containerStartTime); + entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, containerId.getApplicationAttemptId().getApplicationId())); } @SuppressWarnings("unchecked") private void publishContainerFinishedEvent(ContainerStatus containerStatus, - long timeStamp) { + long containerFinishTime, long containerStartTime) { ContainerId containerId = containerStatus.getContainerId(); TimelineEntity entity = createContainerEntity(containerId); @@ -215,13 +220,14 @@ public class NMTimelinePublisher extends CompositeService { entityInfo.put(ContainerMetricsConstants.STATE_INFO, ContainerState.COMPLETE.toString()); entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME, - timeStamp); + containerFinishTime); entity.setInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(timeStamp); + tEvent.setTimestamp(containerFinishTime); entity.addEvent(tEvent); + entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, containerId.getApplicationAttemptId().getApplicationId())); @@ -237,6 +243,8 @@ public class NMTimelinePublisher extends CompositeService { tEvent.setId(eventType); tEvent.setTimestamp(event.getTimestamp()); entity.addEvent(tEvent); + entity.setIdPrefix(TimelineServiceHelper. + invertLong(container.getContainerStartTime())); ApplicationId appId = container.getContainerId().getApplicationAttemptId().getApplicationId(); @@ -300,7 +308,7 @@ public class NMTimelinePublisher extends CompositeService { ApplicationContainerFinishedEvent evnt = (ApplicationContainerFinishedEvent) event; publishContainerFinishedEvent(evnt.getContainerStatus(), - event.getTimestamp()); + event.getTimestamp(), evnt.getContainerStartTime()); break; default: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 05ea03641eb..65558e937cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -601,7 +601,7 @@ public class TestApplication { public void containerFinished(int containerNum) { app.handle(new ApplicationContainerFinishedEvent(containers.get( - containerNum).cloneAndGetContainerStatus())); + containerNum).cloneAndGetContainerStatus(), 0)); drainDispatcherEvents(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 0e039947c49..c1638df7b5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -126,10 +126,12 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized void storeContainer(ContainerId containerId, - int version, StartContainerRequest startRequest) throws IOException { + int version, long startTime, StartContainerRequest startRequest) + throws IOException { RecoveredContainerState rcs = new RecoveredContainerState(); rcs.startRequest = startRequest; rcs.version = version; + rcs.setStartTime(startTime); containerStates.put(containerId, rcs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 01331560e6c..b0a9bc92b1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -234,7 +234,8 @@ public class TestNMLeveldbStateStoreService { StartContainerRequest containerReq = createContainerRequest(containerId); // store a container and verify recovered - stateStore.storeContainer(containerId, 0, containerReq); + long containerStartTime = System.currentTimeMillis(); + stateStore.storeContainer(containerId, 0, containerStartTime, containerReq); // verify the container version key is not stored for new containers DB db = stateStore.getDB(); @@ -246,6 +247,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); + assertEquals(containerStartTime, rcs.getStartTime()); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); @@ -998,7 +1000,7 @@ public class TestNMLeveldbStateStoreService { StartContainerRequest containerReq = StartContainerRequest.newInstance(clc, containerToken); - stateStore.storeContainer(containerId, 0, containerReq); + stateStore.storeContainer(containerId, 0, 0, containerReq); // add a invalid key byte[] invalidKey = ("ContainerManager/containers/" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 4561e85c87e..57bee8c665e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -235,4 +235,8 @@ public class MockContainer implements Container { public boolean isRecovering() { return false; } + + public long getContainerStartTime() { + return 0; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index be1dae183ee..0a71a9179bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -218,8 +218,8 @@ public class TestNMWebServer { Context context = mock(Context.class); Container container = new ContainerImpl(conf, dispatcher, launchContext, - null, metrics, - BuilderUtils.newContainerTokenIdentifier(containerToken), context) { + null, metrics, BuilderUtils.newContainerTokenIdentifier( + containerToken), context) { @Override public ContainerState getContainerState() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 2deec6f6a2f..112c2dda345 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -526,13 +526,6 @@ public class ResourceTrackerService extends AbstractService implements message); } - boolean timelineV2Enabled = - YarnConfiguration.timelineServiceV2Enabled(getConfig()); - if (timelineV2Enabled) { - // Check & update collectors info from request. - updateAppCollectorsMap(request); - } - // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED. if (rmNode.getState() == NodeState.DECOMMISSIONING && decommissioningWatcher.checkReadyToBeDecommissioned( @@ -547,6 +540,13 @@ public class ResourceTrackerService extends AbstractService implements NodeAction.SHUTDOWN, message); } + boolean timelineV2Enabled = + YarnConfiguration.timelineServiceV2Enabled(getConfig()); + if (timelineV2Enabled) { + // Check & update collectors info from request. + updateAppCollectorsMap(request); + } + // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index a3a2ebc992e..7eaa6e7feb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -294,8 +295,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher { @Override public void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime) { - TimelineEntity entity = - createAppAttemptEntity(appAttempt.getAppAttemptId()); + ApplicationAttemptId attemptId = appAttempt.getAppAttemptId(); + TimelineEntity entity = createAppAttemptEntity(attemptId); entity.setCreatedTime(registeredTime); TimelineEvent tEvent = new TimelineEvent(); @@ -317,6 +318,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher { appAttempt.getMasterContainer().getId().toString()); } entity.setInfo(entityInfo); + entity.setIdPrefix( + TimelineServiceHelper.invertLong(attemptId.getAttemptId())); getDispatcher().getEventHandler().handle( new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, @@ -327,7 +330,7 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher { @Override public void appAttemptFinished(RMAppAttempt appAttempt, RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { - + ApplicationAttemptId attemptId = appAttempt.getAppAttemptId(); ApplicationAttemptEntity entity = createAppAttemptEntity(appAttempt.getAppAttemptId()); @@ -346,7 +349,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher { entityInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils .createApplicationAttemptState(appAttemtpState).toString()); entity.setInfo(entityInfo); - + entity.setIdPrefix( + TimelineServiceHelper.invertLong(attemptId.getAttemptId())); getDispatcher().getEventHandler().handle( new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index ec099457510..c6bfcc71b23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineC import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.AfterClass; import org.junit.Assert; @@ -216,7 +217,8 @@ public class TestSystemMetricsPublisherForV2 { + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); - verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 8); + verifyEntity( + appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 8, 0); } @Test(timeout = 10000) @@ -251,7 +253,7 @@ public class TestSystemMetricsPublisherForV2 { File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, - 0); + 0, TimelineServiceHelper.invertLong(appAttemptId.getAttemptId())); } @Test(timeout = 10000) @@ -283,7 +285,7 @@ public class TestSystemMetricsPublisherForV2 { File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); verifyEntity(appFile, 2, - ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0); + ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0, 0); } private RMApp createAppAndRegister(ApplicationId appId) { @@ -297,7 +299,8 @@ public class TestSystemMetricsPublisherForV2 { } private static void verifyEntity(File entityFile, long expectedEvents, - String eventForCreatedTime, long expectedMetrics) throws IOException { + String eventForCreatedTime, long expectedMetrics, long idPrefix) + throws IOException { BufferedReader reader = null; String strLine; long count = 0; @@ -309,6 +312,7 @@ public class TestSystemMetricsPublisherForV2 { TimelineEntity entity = FileSystemTimelineReaderImpl. getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class); metricsCount = entity.getMetrics().size(); + assertEquals(idPrefix, entity.getIdPrefix()); for (TimelineEvent event : entity.getEvents()) { if (event.getId().equals(eventForCreatedTime)) { assertTrue(entity.getCreatedTime() > 0); @@ -394,6 +398,7 @@ public class TestSystemMetricsPublisherForV2 { when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); when(appAttempt.getOriginalTrackingUrl()).thenReturn( "test original tracking url"); + when(appAttempt.getStartTime()).thenReturn(200L); return appAttempt; }