YARN-5792. Adopt the id prefix for YARN, MR, and DS entities. Contributed by Varun Saxena.

This commit is contained in:
Sangjin Lee 2016-11-21 13:48:35 -08:00 committed by Varun Saxena
parent c92a7ab31c
commit 092fead5d9
30 changed files with 587 additions and 286 deletions

View File

@ -78,6 +78,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.JsonNodeFactory;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
@ -1124,7 +1127,7 @@ public class JobHistoryEventHandler extends AbstractService
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskEntity(HistoryEvent event, long timestamp, String taskId,
String entityType, String relatedJobEntity, JobId jobId,
boolean setCreatedTime) {
boolean setCreatedTime, long taskIdPrefix) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskId);
@ -1133,6 +1136,7 @@ public class JobHistoryEventHandler extends AbstractService
((TaskStartedEvent)event).getTaskType().toString());
}
entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
entity.setIdPrefix(taskIdPrefix);
return entity;
}
@ -1141,11 +1145,12 @@ public class JobHistoryEventHandler extends AbstractService
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskAttemptEntity(HistoryEvent event, long timestamp,
String taskAttemptId, String entityType, String relatedTaskEntity,
String taskId, boolean setCreatedTime) {
String taskId, boolean setCreatedTime, long taskAttemptIdPrefix) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskAttemptId);
entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
entity.setIdPrefix(taskAttemptIdPrefix);
return entity;
}
@ -1196,6 +1201,8 @@ public class JobHistoryEventHandler extends AbstractService
String taskId = null;
String taskAttemptId = null;
boolean setCreatedTime = false;
long taskIdPrefix = 0;
long taskAttemptIdPrefix = 0;
switch (event.getEventType()) {
// Handle job events
@ -1218,15 +1225,21 @@ public class JobHistoryEventHandler extends AbstractService
case TASK_STARTED:
setCreatedTime = true;
taskId = ((TaskStartedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskStartedEvent)event).getStartTime());
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskFailedEvent)event).getStartTime());
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskFinishedEvent)event).getStartTime());
break;
case MAP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
@ -1234,6 +1247,8 @@ public class JobHistoryEventHandler extends AbstractService
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((TaskAttemptStartedEvent)event).getStartTime());
break;
case CLEANUP_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
@ -1253,16 +1268,22 @@ public class JobHistoryEventHandler extends AbstractService
getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.invertLong(
((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime());
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).
getAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((MapAttemptFinishedEvent)event).getStartTime());
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).
getAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((ReduceAttemptFinishedEvent)event).getStartTime());
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
@ -1291,12 +1312,12 @@ public class JobHistoryEventHandler extends AbstractService
// TaskEntity
tEntity = createTaskEntity(event, timestamp, taskId,
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
jobId, setCreatedTime);
jobId, setCreatedTime, taskIdPrefix);
} else {
// TaskAttemptEntity
tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
taskId, setCreatedTime);
taskId, setCreatedTime, taskAttemptIdPrefix);
}
}
try {

View File

@ -1530,7 +1530,7 @@ public abstract class TaskAttemptImpl implements
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getCounters(), taskAttempt
.getProgressSplitBlock().burst());
.getProgressSplitBlock().burst(), taskAttempt.launchTime);
return tauce;
}
@ -1953,7 +1953,7 @@ public abstract class TaskAttemptImpl implements
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst());
getProgressSplitBlock().burst(), launchTime);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
} else {
@ -1969,7 +1969,7 @@ public abstract class TaskAttemptImpl implements
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst());
getProgressSplitBlock().burst(), launchTime);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
}

View File

@ -139,6 +139,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private final Set<TaskAttemptId> inProgressAttempts;
private boolean historyTaskStartGenerated = false;
// Launch time reported in history events.
private long launchTime;
private static final SingleArcTransition<TaskImpl, TaskEvent>
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
@ -705,8 +707,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
private void sendTaskStartedEvent() {
launchTime = getLaunchTime();
TaskStartedEvent tse = new TaskStartedEvent(
TypeConverter.fromYarn(taskId), getLaunchTime(),
TypeConverter.fromYarn(taskId), launchTime,
TypeConverter.fromYarn(taskId.getTaskType()),
getSplitsAsString());
eventHandler
@ -714,18 +717,19 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
historyTaskStartGenerated = true;
}
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task,
TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
TypeConverter.fromYarn(task.successfulAttempt),
task.getFinishTime(task.successfulAttempt),
TypeConverter.fromYarn(task.taskId.getTaskType()),
taskState.toString(),
task.getCounters());
taskState.toString(), task.getCounters(), task.launchTime);
return tfe;
}
private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
private static TaskFailedEvent createTaskFailedEvent(TaskImpl task,
List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
StringBuilder errorSb = new StringBuilder();
if (diag != null) {
for (String d : diag) {
@ -740,7 +744,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
errorSb.toString(),
taskState.toString(),
taId == null ? null : TypeConverter.fromYarn(taId),
task.getCounters());
task.getCounters(), task.launchTime);
return taskFailedEvent;
}
@ -861,7 +865,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
taskInfo.getFinishTime(), taskInfo.getTaskType(),
taskInfo.getError(), taskInfo.getTaskStatus(),
taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters(),
launchTime);
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
eventHandler.handle(
new JobTaskEvent(taskId, getExternalState(taskState)));

View File

@ -58,7 +58,7 @@ public class TestEvents {
Counters counters = new Counters();
TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
counters);
counters, 234);
assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
assertEquals(test.getCounters(), counters);
@ -69,7 +69,7 @@ public class TestEvents {
assertEquals(test.getTaskId(), tid);
assertEquals(test.getTaskStatus(), "TEST");
assertEquals(test.getTaskType(), TaskType.REDUCE);
assertEquals(234, test.getStartTime());
}
/**

View File

@ -148,7 +148,7 @@ public class TestJobHistoryEventHandler {
// First completion event, but min-queue-size for batching flushes is 10
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
verify(mockWriter).flush();
} finally {
@ -184,7 +184,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
@ -229,7 +229,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
@ -272,7 +272,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));

View File

@ -32,9 +32,10 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful completion of a map attempt
* Event to record successful completion of a map attempt.
*
*/
@InterfaceAudience.Private
@ -58,9 +59,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
private long startTime;
/**
* Create an event for successful completion of map attempts
* Create an event for successful completion of map attempts.
* @param id Task Attempt ID
* @param taskType Type of the task
* @param taskStatus Status of the task
@ -78,11 +80,12 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
*
* If you have no splits data, code {@code null} for this
* parameter.
* @param startTs Task start time to be used for writing entity to ATSv2.
*/
public MapAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname, int port,
String rackName, String state, Counters counters, int[][] allSplits) {
public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long mapFinishTime, long finishTime, String hostname,
int port, String rackName, String state, Counters counters,
int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@ -98,6 +101,16 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
this.startTime = startTs;
}
public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long mapFinishTime, long finishTime, String hostname,
int port, String rackName, String state, Counters counters,
int[][] allSplits) {
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, port,
rackName, state, counters, allSplits,
SystemClock.getInstance().getTime());
}
/**
@ -117,15 +130,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* @param counters Counters for the attempt
*/
@Deprecated
public MapAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname,
public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long mapFinishTime, long finishTime, String hostname,
String state, Counters counters) {
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, -1, "",
state, counters, null);
}
MapAttemptFinishedEvent() {}
public Object getDatum() {
@ -175,38 +186,56 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
/** Get the task ID */
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the attempt id */
/** Gets the task ID. */
public TaskID getTaskId() {
return attemptId.getTaskID();
}
/** Gets the attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
/** Get the task type */
/** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get the task status */
/** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the map phase finish time */
/** Gets the map phase finish time. */
public long getMapFinishTime() { return mapFinishTime; }
/** Get the attempt finish time */
/** Gets the attempt finish time. */
public long getFinishTime() { return finishTime; }
/** Get the host name */
/**
* Gets the task attempt start time.
* @return task attempt start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the host name. */
public String getHostname() { return hostname.toString(); }
/** Get the tracker rpc port */
/** Gets the tracker rpc port. */
public int getPort() { return port; }
/** Get the rack name */
/** Gets the rack name. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
public String getState() { return state.toString(); }
/** Get the counters */
Counters getCounters() { return counters; }
/** Get the event type */
/**
* Gets the attempt state string.
* @return map attempt state
*/
public String getState() {
return state.toString();
}
/**
* Gets the counters.
* @return counters
*/
Counters getCounters() {
return counters;
}
/** Gets the event type. */
public EventType getEventType() {
return EventType.MAP_ATTEMPT_FINISHED;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful completion of a reduce attempt
@ -59,6 +60,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
private long startTime;
/**
* Create an event to record completion of a reduce attempt
@ -77,12 +79,12 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
* virtual memory and physical memory.
* @param startTs Task start time to be used for writing entity to ATSv2.
*/
public ReduceAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long shuffleFinishTime, long sortFinishTime, long finishTime,
String hostname, int port, String rackName, String state,
Counters counters, int[][] allSplits) {
public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long shuffleFinishTime, long sortFinishTime,
long finishTime, String hostname, int port, String rackName,
String state, Counters counters, int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@ -99,6 +101,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
this.startTime = startTs;
}
public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long shuffleFinishTime, long sortFinishTime,
long finishTime, String hostname, int port, String rackName,
String state, Counters counters, int[][] allSplits) {
this(id, taskType, taskStatus, shuffleFinishTime, sortFinishTime,
finishTime, hostname, port, rackName, state, counters, allSplits,
SystemClock.getInstance().getTime());
}
/**
@ -118,10 +130,9 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param state State of the attempt
* @param counters Counters for the attempt
*/
public ReduceAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long shuffleFinishTime, long sortFinishTime, long finishTime,
String hostname, String state, Counters counters) {
public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long shuffleFinishTime, long sortFinishTime,
long finishTime, String hostname, String state, Counters counters) {
this(id, taskType, taskStatus,
shuffleFinishTime, sortFinishTime, finishTime,
hostname, -1, "", state, counters, null);
@ -178,39 +189,55 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
/** Get the Task ID */
/** Gets the Task ID. */
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the attempt id */
/** Gets the attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
/** Get the task type */
/** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get the task status */
/** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the finish time of the sort phase */
/** Gets the finish time of the sort phase. */
public long getSortFinishTime() { return sortFinishTime; }
/** Get the finish time of the shuffle phase */
/** Gets the finish time of the shuffle phase. */
public long getShuffleFinishTime() { return shuffleFinishTime; }
/** Get the finish time of the attempt */
/** Gets the finish time of the attempt. */
public long getFinishTime() { return finishTime; }
/** Get the name of the host where the attempt ran */
/**
* Gets the start time.
* @return task attempt start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the name of the host where the attempt ran. */
public String getHostname() { return hostname.toString(); }
/** Get the tracker rpc port */
/** Gets the tracker rpc port. */
public int getPort() { return port; }
/** Get the rack name of the node where the attempt ran */
/** Gets the rack name of the node where the attempt ran. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
public String getState() { return state.toString(); }
/** Get the counters for the attempt */
Counters getCounters() { return counters; }
/** Get the event type */
/**
* Gets the state string.
* @return reduce attempt state
*/
public String getState() {
return state.toString();
}
/**
* Gets the counters.
* @return counters
*/
Counters getCounters() {
return counters;
}
/** Gets the event type. */
public EventType getEventType() {
return EventType.REDUCE_ATTEMPT_FINISHED;
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful task completion
@ -50,10 +51,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private String hostname;
private String state;
private Counters counters;
private long startTime;
/**
* Create an event to record successful finishes for setup and cleanup
* attempts
* attempts.
* @param id Attempt ID
* @param taskType Type of task
* @param taskStatus Status of task
@ -61,11 +63,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
* @param hostname Host where the attempt executed
* @param state State string
* @param counters Counters for the attempt
* @param startTs Task start time to be used for writing entity to ATSv2.
*/
public TaskAttemptFinishedEvent(TaskAttemptID id,
TaskType taskType, String taskStatus,
long finishTime, String rackName,
String hostname, String state, Counters counters) {
String hostname, String state, Counters counters, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@ -74,6 +77,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.hostname = hostname;
this.state = state;
this.counters = counters;
this.startTime = startTs;
}
public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long finishTime, String rackName, String hostname,
String state, Counters counters) {
this(id, taskType, taskStatus, finishTime, rackName, hostname, state,
counters, SystemClock.getInstance().getTime());
}
TaskAttemptFinishedEvent() {}
@ -107,33 +118,43 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.counters = EventReader.fromAvro(datum.getCounters());
}
/** Get the task ID */
/** Gets the task ID. */
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the task attempt id */
/** Gets the task attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
/** Get the task type */
/** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get the task status */
/** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the attempt finish time */
/** Gets the attempt finish time. */
public long getFinishTime() { return finishTime; }
/** Get the host where the attempt executed */
/**
* Gets the task attempt start time to be used while publishing to ATSv2.
* @return task attempt start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the host where the attempt executed. */
public String getHostname() { return hostname.toString(); }
/** Get the rackname where the attempt executed */
/** Gets the rackname where the attempt executed. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
/**
* Gets the state string.
* @return task attempt state.
*/
public String getState() { return state.toString(); }
/** Get the counters for the attempt */
/** Gets the counters for the attempt. */
Counters getCounters() { return counters; }
/** Get the event type */
/** Gets the event type. */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record unsuccessful (Killed/Failed) completion of task attempts
@ -58,10 +59,11 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
private long startTime;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
* Create an event to record the unsuccessful completion of attempts
* Create an event to record the unsuccessful completion of attempts.
* @param id Attempt ID
* @param taskType Type of the task
* @param status Status of the attempt
@ -75,12 +77,13 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
* virtual memory and physical memory.
* @param startTs Task start time to be used for writing entity to ATSv2.
*/
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String rackName,
String error, Counters counters, int[][] allSplits) {
String error, Counters counters, int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.status = status;
@ -99,6 +102,15 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes =
ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
this.startTime = startTs;
}
public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id,
TaskType taskType, String status, long finishTime, String hostname,
int port, String rackName, String error, Counters counters,
int[][] allSplits) {
this(id, taskType, status, finishTime, hostname, port, rackName, error,
counters, allSplits, SystemClock.getInstance().getTime());
}
/**
@ -190,39 +202,49 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
/** Get the task id */
/** Gets the task id. */
public TaskID getTaskId() {
return attemptId.getTaskID();
}
/** Get the task type */
/** Gets the task type. */
public TaskType getTaskType() {
return TaskType.valueOf(taskType.toString());
}
/** Get the attempt id */
/** Gets the attempt id. */
public TaskAttemptID getTaskAttemptId() {
return attemptId;
}
/** Get the finish time */
/** Gets the finish time. */
public long getFinishTime() { return finishTime; }
/** Get the name of the host where the attempt executed */
/**
* Gets the task attempt start time to be used while publishing to ATSv2.
* @return task attempt start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the name of the host where the attempt executed. */
public String getHostname() { return hostname; }
/** Get the rpc port for the host where the attempt executed */
/** Gets the rpc port for the host where the attempt executed. */
public int getPort() { return port; }
/** Get the rack name of the node where the attempt ran */
/** Gets the rack name of the node where the attempt ran. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
/** Get the error string */
/** Gets the error string. */
public String getError() { return error.toString(); }
/** Get the task status */
/**
* Gets the task attempt status.
* @return task attempt status.
*/
public String getTaskStatus() {
return status.toString();
}
/** Get the counters */
/** Gets the counters. */
Counters getCounters() { return counters; }
/** Get the event type */
/** Gets the event type. */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record the failure of a task
@ -49,11 +50,12 @@ public class TaskFailedEvent implements HistoryEvent {
private String status;
private String error;
private Counters counters;
private long startTime;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
* Create an event to record task failure
* Create an event to record task failure.
* @param id Task ID
* @param finishTime Finish time of the task
* @param taskType Type of the task
@ -61,10 +63,11 @@ public class TaskFailedEvent implements HistoryEvent {
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
* @param counters Counters for the task
* @param startTs task start time.
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt, Counters counters) {
TaskAttemptID failedDueToAttempt, Counters counters, long startTs) {
this.id = id;
this.finishTime = finishTime;
this.taskType = taskType;
@ -72,13 +75,21 @@ public class TaskFailedEvent implements HistoryEvent {
this.status = status;
this.failedDueToAttempt = failedDueToAttempt;
this.counters = counters;
this.startTime = startTs;
}
public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType,
String error, String status, TaskAttemptID failedDueToAttempt,
Counters counters) {
this(id, finishTime, taskType, error, status, failedDueToAttempt, counters,
SystemClock.getInstance().getTime());
}
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt) {
this(id, finishTime, taskType, error, status,
failedDueToAttempt, EMPTY_COUNTERS);
this(id, finishTime, taskType, error, status, failedDueToAttempt,
EMPTY_COUNTERS);
}
TaskFailedEvent() {}
@ -118,27 +129,37 @@ public class TaskFailedEvent implements HistoryEvent {
EventReader.fromAvro(datum.getCounters());
}
/** Get the task id */
/** Gets the task id. */
public TaskID getTaskId() { return id; }
/** Get the error string */
/** Gets the error string. */
public String getError() { return error; }
/** Get the finish time of the attempt */
/** Gets the finish time of the attempt. */
public long getFinishTime() {
return finishTime;
}
/** Get the task type */
/**
* Gets the task start time to be reported to ATSv2.
* @return task start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get the attempt id due to which the task failed */
/** Gets the attempt id due to which the task failed. */
public TaskAttemptID getFailedAttemptID() {
return failedDueToAttempt;
}
/** Get the task status */
/**
* Gets the task status.
* @return task status
*/
public String getTaskStatus() { return status; }
/** Get task counters */
/** Gets task counters. */
public Counters getCounters() { return counters; }
/** Get the event type */
/** Gets the event type. */
public EventType getEventType() {
return EventType.TASK_FAILED;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record the successful completion of a task
@ -49,25 +50,34 @@ public class TaskFinishedEvent implements HistoryEvent {
private TaskType taskType;
private String status;
private Counters counters;
private long startTime;
/**
* Create an event to record the successful completion of a task
* Create an event to record the successful completion of a task.
* @param id Task ID
* @param attemptId Task Attempt ID of the successful attempt for this task
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param status Status string
* @param counters Counters for the task
* @param startTs task start time
*/
public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
TaskType taskType,
String status, Counters counters) {
String status, Counters counters, long startTs) {
this.taskid = id;
this.successfulAttemptId = attemptId;
this.finishTime = finishTime;
this.taskType = taskType;
this.status = status;
this.counters = counters;
this.startTime = startTs;
}
public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
TaskType taskType, String status, Counters counters) {
this(id, attemptId, finishTime, taskType, status, counters,
SystemClock.getInstance().getTime());
}
TaskFinishedEvent() {}
@ -101,23 +111,33 @@ public class TaskFinishedEvent implements HistoryEvent {
this.counters = EventReader.fromAvro(datum.getCounters());
}
/** Get task id */
/** Gets task id. */
public TaskID getTaskId() { return taskid; }
/** Get successful task attempt id */
/** Gets successful task attempt id. */
public TaskAttemptID getSuccessfulTaskAttemptId() {
return successfulAttemptId;
}
/** Get the task finish time */
/** Gets the task finish time. */
public long getFinishTime() { return finishTime; }
/** Get task counters */
/**
* Gets the task start time to be reported to ATSv2.
* @return task start time
*/
public long getStartTime() {
return startTime;
}
/** Gets task counters. */
public Counters getCounters() { return counters; }
/** Get task type */
/** Gets task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get task status */
/**
* Gets task status.
* @return task status
*/
public String getTaskStatus() { return status.toString(); }
/** Get event type */
/** Gets event type. */
public EventType getEventType() {
return EventType.TASK_FINISHED;
}

View File

@ -298,10 +298,10 @@ public class TestMRTimelineEventHandling {
" does not exist.",
jobEventFile.exists());
verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(),
true, false, null);
true, false, null, false);
Set<String> cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2",
"huge_dummy_conf1", "huge_dummy_conf2");
verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
verifyEntity(jobEventFile, null, false, true, cfgsToCheck, false);
// for this test, we expect MR job metrics are published in YARN_APPLICATION
String outputAppDir =
@ -322,8 +322,8 @@ public class TestMRTimelineEventHandling {
"appEventFilePath: " + appEventFilePath +
" does not exist.",
appEventFile.exists());
verifyEntity(appEventFile, null, true, false, null);
verifyEntity(appEventFile, null, false, true, cfgsToCheck);
verifyEntity(appEventFile, null, true, false, null, false);
verifyEntity(appEventFile, null, false, true, cfgsToCheck, false);
// check for task event file
String outputDirTask =
@ -344,7 +344,7 @@ public class TestMRTimelineEventHandling {
" does not exist.",
taskEventFile.exists());
verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
true, false, null);
true, false, null, true);
// check for task attempt event file
String outputDirTaskAttempt =
@ -363,7 +363,7 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
" does not exist.", taskAttemptEventFile.exists());
verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(),
true, false, null);
true, false, null, true);
}
/**
@ -380,12 +380,13 @@ public class TestMRTimelineEventHandling {
* @throws IOException
*/
private void verifyEntity(File entityFile, String eventId,
boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify)
throws IOException {
boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify,
boolean checkIdPrefix) throws IOException {
BufferedReader reader = null;
String strLine;
try {
reader = new BufferedReader(new FileReader(entityFile));
long idPrefix = -1;
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
@ -394,6 +395,19 @@ public class TestMRTimelineEventHandling {
strLine.trim(),
org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity.class);
LOG.info("strLine.trim()= " + strLine.trim());
if (checkIdPrefix) {
Assert.assertTrue("Entity ID prefix expected to be > 0" ,
entity.getIdPrefix() > 0);
if (idPrefix == -1) {
idPrefix = entity.getIdPrefix();
} else {
Assert.assertEquals("Entity ID prefix should be same across " +
"each publish of same entity",
idPrefix, entity.getIdPrefix());
}
}
if (eventId == null) {
// Job metrics are published without any events for
// ApplicationEntity. There is also possibility that some other

View File

@ -104,6 +104,8 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager;
@ -313,6 +315,17 @@ public class ApplicationMaster {
protected final Set<ContainerId> launchedContainers =
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
/**
* Container start times used to set id prefix while publishing entity
* to ATSv2.
*/
private final ConcurrentMap<ContainerId, Long> containerStartTimes =
new ConcurrentHashMap<ContainerId, Long>();
private ConcurrentMap<ContainerId, Long> getContainerStartTimes() {
return containerStartTimes;
}
/**
* @param args Command line args
*/
@ -866,7 +879,15 @@ public class ApplicationMaster {
+ containerStatus.getContainerId());
}
if (timelineServiceV2Enabled) {
publishContainerEndEventOnTimelineServiceV2(containerStatus);
Long containerStartTime =
containerStartTimes.get(containerStatus.getContainerId());
if (containerStartTime == null) {
containerStartTime = SystemClock.getInstance().getTime();
containerStartTimes.put(containerStatus.getContainerId(),
containerStartTime);
}
publishContainerEndEventOnTimelineServiceV2(containerStatus,
containerStartTime);
} else if (timelineServiceV1Enabled) {
publishContainerEndEvent(timelineClient, containerStatus, domainId,
appSubmitterUgi);
@ -994,8 +1015,10 @@ public class ApplicationMaster {
containerId, container.getNodeId());
}
if (applicationMaster.timelineServiceV2Enabled) {
applicationMaster
.publishContainerStartEventOnTimelineServiceV2(container);
long startTime = SystemClock.getInstance().getTime();
applicationMaster.getContainerStartTimes().put(containerId, startTime);
applicationMaster.publishContainerStartEventOnTimelineServiceV2(
container, startTime);
} else if (applicationMaster.timelineServiceV1Enabled) {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
@ -1356,24 +1379,24 @@ public class ApplicationMaster {
}
private void publishContainerStartEventOnTimelineServiceV2(
Container container) {
Container container, long startTime) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity =
new org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity();
entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
long ts = System.currentTimeMillis();
entity.setCreatedTime(ts);
entity.setCreatedTime(startTime);
entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(ts);
event.setTimestamp(startTime);
event.setId(DSEvent.DS_CONTAINER_START.toString());
event.addInfo("Node", container.getNodeId().toString());
event.addInfo("Resources", container.getResource().toString());
entity.addEvent(event);
entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@ -1391,7 +1414,7 @@ public class ApplicationMaster {
}
private void publishContainerEndEventOnTimelineServiceV2(
final ContainerStatus container) {
final ContainerStatus container, long containerStartTime) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity =
new org.apache.hadoop.yarn.api.records.timelineservice.
@ -1407,6 +1430,7 @@ public class ApplicationMaster {
event.addInfo("State", container.getState().name());
event.addInfo("Exit Status", container.getExitStatus());
entity.addEvent(event);
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@ -1441,6 +1465,8 @@ public class ApplicationMaster {
event.setId(appEvent.toString());
event.setTimestamp(ts);
entity.addEvent(event);
entity.setIdPrefix(
TimelineServiceHelper.invertLong(appAttemptID.getAttemptId()));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {

View File

@ -64,7 +64,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@ -523,15 +526,31 @@ public class TestDistributedShell {
"appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_000001"
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
appTimestampFileName);
File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
"DS_APP_ATTEMPT", appTimestampFileName);
// Check if required events are published and same idprefix is sent for
// on each publish.
verifyEntityForTimelineV2(dsAppAttemptEntityFile,
DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
// to avoid race condition of testcase, atleast check 40 times with sleep
// of 50ms
verifyEntityForTimelineV2(dsAppAttemptEntityFile,
DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
// Verify DS_CONTAINER entities posted by the client
// Verify DS_CONTAINER entities posted by the client.
String containerTimestampFileName =
"container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000002.thist";
verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
containerTimestampFileName);
File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
"DS_CONTAINER", containerTimestampFileName);
// Check if required events are published and same idprefix is sent for
// on each publish.
verifyEntityForTimelineV2(dsContainerEntityFile,
DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
// to avoid race condition of testcase, atleast check 40 times with sleep
// of 50ms
verifyEntityForTimelineV2(dsContainerEntityFile,
DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
// Verify NM posting container metrics info.
String containerMetricsTimestampFileName =
@ -541,29 +560,13 @@ public class TestDistributedShell {
File containerEntityFile = verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_CONTAINER.toString(),
containerMetricsTimestampFileName);
Assert.assertEquals(
"Container created event needs to be published atleast once",
1,
getNumOfStringOccurrences(containerEntityFile,
ContainerMetricsConstants.CREATED_EVENT_TYPE));
verifyEntityForTimelineV2(containerEntityFile,
ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
// to avoid race condition of testcase, atleast check 4 times with sleep
// of 500ms
long numOfContainerFinishedOccurrences = 0;
for (int i = 0; i < 4; i++) {
numOfContainerFinishedOccurrences =
getNumOfStringOccurrences(containerEntityFile,
ContainerMetricsConstants.FINISHED_EVENT_TYPE);
if (numOfContainerFinishedOccurrences > 0) {
break;
} else {
Thread.sleep(500L);
}
}
Assert.assertEquals(
"Container finished event needs to be published atleast once",
1,
numOfContainerFinishedOccurrences);
// to avoid race condition of testcase, atleast check 40 times with sleep
// of 50ms
verifyEntityForTimelineV2(containerEntityFile,
ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
// Verify RM posting Application life cycle Events are getting published
String appMetricsTimestampFileName =
@ -573,29 +576,14 @@ public class TestDistributedShell {
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION.toString(),
appMetricsTimestampFileName);
Assert.assertEquals(
"Application created event should be published atleast once",
1,
getNumOfStringOccurrences(appEntityFile,
ApplicationMetricsConstants.CREATED_EVENT_TYPE));
// No need to check idprefix for app.
verifyEntityForTimelineV2(appEntityFile,
ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
// to avoid race condition of testcase, atleast check 4 times with sleep
// of 500ms
long numOfStringOccurrences = 0;
for (int i = 0; i < 4; i++) {
numOfStringOccurrences =
getNumOfStringOccurrences(appEntityFile,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
if (numOfStringOccurrences > 0) {
break;
} else {
Thread.sleep(500L);
}
}
Assert.assertEquals(
"Application finished event should be published atleast once",
1,
numOfStringOccurrences);
// to avoid race condition of testcase, atleast check 40 times with sleep
// of 50ms
verifyEntityForTimelineV2(appEntityFile,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
// Verify RM posting AppAttempt life cycle Events are getting published
String appAttemptMetricsTimestampFileName =
@ -606,17 +594,10 @@ public class TestDistributedShell {
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
appAttemptMetricsTimestampFileName);
Assert.assertEquals(
"AppAttempt register event should be published atleast once",
1,
getNumOfStringOccurrences(appAttemptEntityFile,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE));
Assert.assertEquals(
"AppAttempt finished event should be published atleast once",
1,
getNumOfStringOccurrences(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE));
verifyEntityForTimelineV2(appAttemptEntityFile,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
verifyEntityForTimelineV2(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
} finally {
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
}
@ -636,22 +617,64 @@ public class TestDistributedShell {
return entityFile;
}
private long getNumOfStringOccurrences(File entityFile, String searchString)
throws IOException {
BufferedReader reader = null;
String strLine;
/**
* Checks the events and idprefix published for an entity.
*
* @param entityFile Entity file.
* @param expectedEvent Expected event Id.
* @param numOfExpectedEvent Number of expected occurences of expected event
* id.
* @param checkTimes Number of times to check.
* @param sleepTime Sleep time for each iteration.
* @param checkIdPrefix Whether to check idprefix.
* @throws IOException if entity file reading fails.
* @throws InterruptedException if sleep is interrupted.
*/
private void verifyEntityForTimelineV2(File entityFile, String expectedEvent,
long numOfExpectedEvent, int checkTimes, long sleepTime,
boolean checkIdPrefix) throws IOException, InterruptedException {
long actualCount = 0;
for (int i = 0; i < checkTimes; i++) {
BufferedReader reader = null;
String strLine = null;
actualCount = 0;
try {
reader = new BufferedReader(new FileReader(entityFile));
long idPrefix = -1;
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().contains(searchString)) {
String entityLine = strLine.trim();
if (entityLine.isEmpty()) {
continue;
}
if (entityLine.contains(expectedEvent)) {
actualCount++;
}
if (checkIdPrefix) {
TimelineEntity entity = FileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
Assert.assertTrue("Entity ID prefix expected to be > 0" ,
entity.getIdPrefix() > 0);
if (idPrefix == -1) {
idPrefix = entity.getIdPrefix();
} else {
Assert.assertEquals("Entity ID prefix should be same across " +
"each publish of same entity",
idPrefix, entity.getIdPrefix());
}
}
}
} finally {
reader.close();
}
return actualCount;
if (numOfExpectedEvent == actualCount) {
break;
}
if (sleepTime > 0 && i < checkTimes - 1) {
Thread.sleep(sleepTime);
}
}
Assert.assertEquals("Unexpected number of " + expectedEvent +
" event published.", numOfExpectedEvent, actualCount);
}
/**

View File

@ -155,6 +155,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@ -1052,10 +1053,11 @@ public class ContainerManagerImpl extends CompositeService implements
Credentials credentials =
YarnServerSecurityUtils.parseCredentials(launchContext);
long containerStartTime = SystemClock.getInstance().getTime();
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
launchContext, credentials, metrics, containerTokenIdentifier,
context);
context, containerStartTime);
ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerId, container) != null) {
@ -1112,7 +1114,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
this.context.getNMStateStore().storeContainer(containerId,
containerTokenIdentifier.getVersion(), request);
containerTokenIdentifier.getVersion(), containerStartTime, request);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));

View File

@ -23,12 +23,16 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
public class ApplicationContainerFinishedEvent extends ApplicationEvent {
private ContainerStatus containerStatus;
// Required by NMTimelinePublisher.
private long containerStartTime;
public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) {
public ApplicationContainerFinishedEvent(ContainerStatus containerStatus,
long containerStartTs) {
super(containerStatus.getContainerId().getApplicationAttemptId().
getApplicationId(),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
this.containerStatus = containerStatus;
this.containerStartTime = containerStartTs;
}
public ContainerId getContainerID() {
@ -39,4 +43,7 @@ public class ApplicationContainerFinishedEvent extends ApplicationEvent {
return containerStatus;
}
public long getContainerStartTime() {
return containerStartTime;
}
}

View File

@ -37,6 +37,8 @@ public interface Container extends EventHandler<ContainerEvent> {
ContainerId getContainerId();
long getContainerStartTime();
Resource getResource();
ContainerTokenIdentifier getContainerTokenIdentifier();

View File

@ -173,11 +173,11 @@ public class ContainerImpl implements Container {
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
private final long startTime;
private static final Logger LOG =
LoggerFactory.getLogger(ContainerImpl.class);
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
RecoveredContainerStatus.REQUESTED;
@ -190,6 +190,16 @@ public class ContainerImpl implements Container {
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, Context context) {
this(conf, dispatcher, launchContext, creds, metrics,
containerTokenIdentifier, context, SystemClock.getInstance().getTime());
}
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, Context context,
long startTs) {
this.startTime = startTs;
this.daemonConf = conf;
this.dispatcher = dispatcher;
this.stateStore = context.getNMStateStore();
@ -263,7 +273,7 @@ public class ContainerImpl implements Container {
ContainerTokenIdentifier containerTokenIdentifier, Context context,
RecoveredContainerState rcs) {
this(conf, dispatcher, launchContext, creds, metrics,
containerTokenIdentifier, context);
containerTokenIdentifier, context, rcs.getStartTime());
this.recoveredStatus = rcs.getStatus();
this.exitCode = rcs.getExitCode();
this.recoveredAsKilled = rcs.getKilled();
@ -630,6 +640,11 @@ public class ContainerImpl implements Container {
return this.containerId;
}
@Override
public long getContainerStartTime() {
return this.startTime;
}
@Override
public Resource getResource() {
return Resources.clone(
@ -694,7 +709,8 @@ public class ContainerImpl implements Container {
EventHandler eventHandler = dispatcher.getEventHandler();
ContainerStatus containerStatus = cloneAndGetContainerStatus();
eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
eventHandler.handle(
new ApplicationContainerFinishedEvent(containerStatus, startTime));
// Tell the scheduler the container is Done
eventHandler.handle(new ContainerSchedulerEvent(this,

View File

@ -112,6 +112,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
"ContainerManager/containers/";
private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
private static final String CONTAINER_VERSION_KEY_SUFFIX = "/version";
private static final String CONTAINER_START_TIME_KEY_SUFFIX = "/starttime";
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
@ -257,6 +258,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
StartContainerRequestProto.parseFrom(entry.getValue()));
} else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) {
rcs.version = Integer.parseInt(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) {
rcs.setStartTime(Long.parseLong(asString(entry.getValue())));
} else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
rcs.diagnostics = asString(entry.getValue());
} else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) {
@ -296,21 +299,23 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
@Override
public void storeContainer(ContainerId containerId, int containerVersion,
StartContainerRequest startRequest) throws IOException {
long startTime, StartContainerRequest startRequest) throws IOException {
String idStr = containerId.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("storeContainer: containerId= " + idStr
+ ", startRequest= " + startRequest);
}
String keyRequest = CONTAINERS_KEY_PREFIX + idStr
+ CONTAINER_REQUEST_KEY_SUFFIX;
String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
String keyVersion = getContainerVersionKey(idStr);
String keyStartTime =
getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX);
try {
WriteBatch batch = db.createWriteBatch();
try {
batch.put(bytes(keyRequest),
((StartContainerRequestPBImpl) startRequest)
.getProto().toByteArray());
((StartContainerRequestPBImpl) startRequest).getProto().
toByteArray());
batch.put(bytes(keyStartTime), bytes(Long.toString(startTime)));
if (containerVersion != 0) {
batch.put(bytes(keyVersion),
bytes(Integer.toString(containerVersion)));
@ -326,7 +331,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
@VisibleForTesting
String getContainerVersionKey(String containerId) {
return CONTAINERS_KEY_PREFIX + containerId + CONTAINER_VERSION_KEY_SUFFIX;
return getContainerKey(containerId, CONTAINER_VERSION_KEY_SUFFIX);
}
private String getContainerKey(String containerId, String suffix) {
return CONTAINERS_KEY_PREFIX + containerId + suffix;
}
@Override

View File

@ -71,7 +71,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
@Override
public void storeContainer(ContainerId containerId, int version,
StartContainerRequest startRequest) throws IOException {
long startTime, StartContainerRequest startRequest) throws IOException {
}
@Override

View File

@ -87,6 +87,7 @@ public abstract class NMStateStoreService extends AbstractService {
int version;
private RecoveredContainerType recoveryType =
RecoveredContainerType.RECOVER;
private long startTime;
public RecoveredContainerStatus getStatus() {
return status;
@ -108,6 +109,14 @@ public abstract class NMStateStoreService extends AbstractService {
return version;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long ts) {
startTime = ts;
}
public StartContainerRequest getStartRequest() {
return startRequest;
}
@ -145,6 +154,7 @@ public abstract class NMStateStoreService extends AbstractService {
return new StringBuffer("Status: ").append(getStatus())
.append(", Exit code: ").append(exitCode)
.append(", Version: ").append(version)
.append(", Start Time: ").append(startTime)
.append(", Killed: ").append(getKilled())
.append(", Diagnostics: ").append(getDiagnostics())
.append(", Capability: ").append(getCapability())
@ -365,11 +375,12 @@ public abstract class NMStateStoreService extends AbstractService {
* Record a container start request
* @param containerId the container ID
* @param containerVersion the container Version
* @param startTime container start time
* @param startRequest the container start request
* @throws IOException
*/
public abstract void storeContainer(ContainerId containerId,
int containerVersion, StartContainerRequest startRequest)
int containerVersion, long startTime, StartContainerRequest startRequest)
throws IOException;
/**

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
@ -149,6 +150,8 @@ public class NMTimelinePublisher extends CompositeService {
Math.round(cpuUsagePercentPerCore));
entity.addMetric(cpuMetric);
}
entity.setIdPrefix(TimelineServiceHelper.
invertLong(container.getContainerStartTime()));
ApplicationId appId = container.getContainerId().getApplicationAttemptId()
.getApplicationId();
try {
@ -195,15 +198,17 @@ public class NMTimelinePublisher extends CompositeService {
tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
long containerStartTime = container.getContainerStartTime();
entity.addEvent(tEvent);
entity.setCreatedTime(event.getTimestamp());
entity.setCreatedTime(containerStartTime);
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
@SuppressWarnings("unchecked")
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
long timeStamp) {
long containerFinishTime, long containerStartTime) {
ContainerId containerId = containerStatus.getContainerId();
TimelineEntity entity = createContainerEntity(containerId);
@ -215,13 +220,14 @@ public class NMTimelinePublisher extends CompositeService {
entityInfo.put(ContainerMetricsConstants.STATE_INFO,
ContainerState.COMPLETE.toString());
entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
timeStamp);
containerFinishTime);
entity.setInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(timeStamp);
tEvent.setTimestamp(containerFinishTime);
entity.addEvent(tEvent);
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
@ -237,6 +243,8 @@ public class NMTimelinePublisher extends CompositeService {
tEvent.setId(eventType);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
entity.setIdPrefix(TimelineServiceHelper.
invertLong(container.getContainerStartTime()));
ApplicationId appId =
container.getContainerId().getApplicationAttemptId().getApplicationId();
@ -300,7 +308,7 @@ public class NMTimelinePublisher extends CompositeService {
ApplicationContainerFinishedEvent evnt =
(ApplicationContainerFinishedEvent) event;
publishContainerFinishedEvent(evnt.getContainerStatus(),
event.getTimestamp());
event.getTimestamp(), evnt.getContainerStartTime());
break;
default:

View File

@ -601,7 +601,7 @@ public class TestApplication {
public void containerFinished(int containerNum) {
app.handle(new ApplicationContainerFinishedEvent(containers.get(
containerNum).cloneAndGetContainerStatus()));
containerNum).cloneAndGetContainerStatus(), 0));
drainDispatcherEvents();
}

View File

@ -126,10 +126,12 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
public synchronized void storeContainer(ContainerId containerId,
int version, StartContainerRequest startRequest) throws IOException {
int version, long startTime, StartContainerRequest startRequest)
throws IOException {
RecoveredContainerState rcs = new RecoveredContainerState();
rcs.startRequest = startRequest;
rcs.version = version;
rcs.setStartTime(startTime);
containerStates.put(containerId, rcs);
}

View File

@ -234,7 +234,8 @@ public class TestNMLeveldbStateStoreService {
StartContainerRequest containerReq = createContainerRequest(containerId);
// store a container and verify recovered
stateStore.storeContainer(containerId, 0, containerReq);
long containerStartTime = System.currentTimeMillis();
stateStore.storeContainer(containerId, 0, containerStartTime, containerReq);
// verify the container version key is not stored for new containers
DB db = stateStore.getDB();
@ -246,6 +247,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(containerStartTime, rcs.getStartTime());
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
@ -998,7 +1000,7 @@ public class TestNMLeveldbStateStoreService {
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
containerToken);
stateStore.storeContainer(containerId, 0, containerReq);
stateStore.storeContainer(containerId, 0, 0, containerReq);
// add a invalid key
byte[] invalidKey = ("ContainerManager/containers/"

View File

@ -235,4 +235,8 @@ public class MockContainer implements Container {
public boolean isRecovering() {
return false;
}
public long getContainerStartTime() {
return 0;
}
}

View File

@ -218,8 +218,8 @@ public class TestNMWebServer {
Context context = mock(Context.class);
Container container =
new ContainerImpl(conf, dispatcher, launchContext,
null, metrics,
BuilderUtils.newContainerTokenIdentifier(containerToken), context) {
null, metrics, BuilderUtils.newContainerTokenIdentifier(
containerToken), context) {
@Override
public ContainerState getContainerState() {

View File

@ -526,13 +526,6 @@ public class ResourceTrackerService extends AbstractService implements
message);
}
boolean timelineV2Enabled =
YarnConfiguration.timelineServiceV2Enabled(getConfig());
if (timelineV2Enabled) {
// Check & update collectors info from request.
updateAppCollectorsMap(request);
}
// Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED.
if (rmNode.getState() == NodeState.DECOMMISSIONING &&
decommissioningWatcher.checkReadyToBeDecommissioned(
@ -547,6 +540,13 @@ public class ResourceTrackerService extends AbstractService implements
NodeAction.SHUTDOWN, message);
}
boolean timelineV2Enabled =
YarnConfiguration.timelineServiceV2Enabled(getConfig());
if (timelineV2Enabled) {
// Check & update collectors info from request.
updateAppCollectorsMap(request);
}
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
@ -294,8 +295,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
@Override
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
TimelineEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
ApplicationAttemptId attemptId = appAttempt.getAppAttemptId();
TimelineEntity entity = createAppAttemptEntity(attemptId);
entity.setCreatedTime(registeredTime);
TimelineEvent tEvent = new TimelineEvent();
@ -317,6 +318,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
appAttempt.getMasterContainer().getId().toString());
}
entity.setInfo(entityInfo);
entity.setIdPrefix(
TimelineServiceHelper.invertLong(attemptId.getAttemptId()));
getDispatcher().getEventHandler().handle(
new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
@ -327,7 +330,7 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
ApplicationAttemptId attemptId = appAttempt.getAppAttemptId();
ApplicationAttemptEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
@ -346,7 +349,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
entityInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
.createApplicationAttemptState(appAttemtpState).toString());
entity.setInfo(entityInfo);
entity.setIdPrefix(
TimelineServiceHelper.invertLong(attemptId.getAttemptId()));
getDispatcher().getEventHandler().handle(
new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineC
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@ -216,7 +217,8 @@ public class TestSystemMetricsPublisherForV2 {
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 8);
verifyEntity(
appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 8, 0);
}
@Test(timeout = 10000)
@ -251,7 +253,7 @@ public class TestSystemMetricsPublisherForV2 {
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE,
0);
0, TimelineServiceHelper.invertLong(appAttemptId.getAttemptId()));
}
@Test(timeout = 10000)
@ -283,7 +285,7 @@ public class TestSystemMetricsPublisherForV2 {
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile, 2,
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0);
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0, 0);
}
private RMApp createAppAndRegister(ApplicationId appId) {
@ -297,7 +299,8 @@ public class TestSystemMetricsPublisherForV2 {
}
private static void verifyEntity(File entityFile, long expectedEvents,
String eventForCreatedTime, long expectedMetrics) throws IOException {
String eventForCreatedTime, long expectedMetrics, long idPrefix)
throws IOException {
BufferedReader reader = null;
String strLine;
long count = 0;
@ -309,6 +312,7 @@ public class TestSystemMetricsPublisherForV2 {
TimelineEntity entity = FileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
metricsCount = entity.getMetrics().size();
assertEquals(idPrefix, entity.getIdPrefix());
for (TimelineEvent event : entity.getEvents()) {
if (event.getId().equals(eventForCreatedTime)) {
assertTrue(entity.getCreatedTime() > 0);
@ -394,6 +398,7 @@ public class TestSystemMetricsPublisherForV2 {
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
when(appAttempt.getOriginalTrackingUrl()).thenReturn(
"test original tracking url");
when(appAttempt.getStartTime()).thenReturn(200L);
return appAttempt;
}