YARN-5792. Adopt the id prefix for YARN, MR, and DS entities. Contributed by Varun Saxena.

(cherry picked from commit f734977b27a514ce0561638c0a6a17b1ef093026)
This commit is contained in:
Sangjin Lee 2016-11-21 13:48:35 -08:00 committed by Varun Saxena
parent 684c2d149f
commit b8cfb4fcb3
28 changed files with 576 additions and 276 deletions

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.JsonNodeFactory;
@ -1120,7 +1121,7 @@ public class JobHistoryEventHandler extends AbstractService
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskEntity(HistoryEvent event, long timestamp, String taskId,
String entityType, String relatedJobEntity, JobId jobId,
boolean setCreatedTime) {
boolean setCreatedTime, long taskIdPrefix) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskId);
@ -1129,6 +1130,7 @@ public class JobHistoryEventHandler extends AbstractService
((TaskStartedEvent)event).getTaskType().toString());
}
entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
entity.setIdPrefix(taskIdPrefix);
return entity;
}
@ -1137,11 +1139,12 @@ public class JobHistoryEventHandler extends AbstractService
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskAttemptEntity(HistoryEvent event, long timestamp,
String taskAttemptId, String entityType, String relatedTaskEntity,
String taskId, boolean setCreatedTime) {
String taskId, boolean setCreatedTime, long taskAttemptIdPrefix) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskAttemptId);
entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
entity.setIdPrefix(taskAttemptIdPrefix);
return entity;
}
@ -1192,6 +1195,8 @@ public class JobHistoryEventHandler extends AbstractService
String taskId = null;
String taskAttemptId = null;
boolean setCreatedTime = false;
long taskIdPrefix = 0;
long taskAttemptIdPrefix = 0;
switch (event.getEventType()) {
// Handle job events
@ -1214,15 +1219,21 @@ public class JobHistoryEventHandler extends AbstractService
case TASK_STARTED:
setCreatedTime = true;
taskId = ((TaskStartedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskStartedEvent)event).getStartTime());
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskFailedEvent)event).getStartTime());
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskFinishedEvent)event).getStartTime());
break;
case MAP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
@ -1230,6 +1241,8 @@ public class JobHistoryEventHandler extends AbstractService
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((TaskAttemptStartedEvent)event).getStartTime());
break;
case CLEANUP_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
@ -1249,16 +1262,22 @@ public class JobHistoryEventHandler extends AbstractService
getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.invertLong(
((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime());
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).
getAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((MapAttemptFinishedEvent)event).getStartTime());
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).
getAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((ReduceAttemptFinishedEvent)event).getStartTime());
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
@ -1287,12 +1306,12 @@ public class JobHistoryEventHandler extends AbstractService
// TaskEntity
tEntity = createTaskEntity(event, timestamp, taskId,
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
jobId, setCreatedTime);
jobId, setCreatedTime, taskIdPrefix);
} else {
// TaskAttemptEntity
tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
taskId, setCreatedTime);
taskId, setCreatedTime, taskAttemptIdPrefix);
}
}
try {

View File

@ -1580,7 +1580,7 @@ public abstract class TaskAttemptImpl implements
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getCounters(), taskAttempt
.getProgressSplitBlock().burst());
.getProgressSplitBlock().burst(), taskAttempt.launchTime);
return tauce;
}
@ -1993,35 +1993,35 @@ public abstract class TaskAttemptImpl implements
this.container == null ? -1 : this.container.getNodeId().getPort();
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
MapAttemptFinishedEvent mfe =
new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.mapFinishTime,
finishTime,
containerHostName,
containerNodePort,
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.mapFinishTime,
finishTime,
containerHostName,
containerNodePort,
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst(), launchTime);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
} else {
ReduceAttemptFinishedEvent rfe =
new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
finishTime,
containerHostName,
containerNodePort,
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
ReduceAttemptFinishedEvent rfe =
new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
finishTime,
containerHostName,
containerNodePort,
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst(), launchTime);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
}
}

View File

@ -139,6 +139,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private final Set<TaskAttemptId> inProgressAttempts;
private boolean historyTaskStartGenerated = false;
// Launch time reported in history events.
private long launchTime;
private static final SingleArcTransition<TaskImpl, TaskEvent>
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
@ -705,8 +707,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
private void sendTaskStartedEvent() {
launchTime = getLaunchTime();
TaskStartedEvent tse = new TaskStartedEvent(
TypeConverter.fromYarn(taskId), getLaunchTime(),
TypeConverter.fromYarn(taskId), launchTime,
TypeConverter.fromYarn(taskId.getTaskType()),
getSplitsAsString());
eventHandler
@ -714,18 +717,19 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
historyTaskStartGenerated = true;
}
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task,
TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
TypeConverter.fromYarn(task.successfulAttempt),
task.getFinishTime(task.successfulAttempt),
TypeConverter.fromYarn(task.taskId.getTaskType()),
taskState.toString(),
task.getCounters());
taskState.toString(), task.getCounters(), task.launchTime);
return tfe;
}
private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
private static TaskFailedEvent createTaskFailedEvent(TaskImpl task,
List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
StringBuilder errorSb = new StringBuilder();
if (diag != null) {
for (String d : diag) {
@ -740,7 +744,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
errorSb.toString(),
taskState.toString(),
taId == null ? null : TypeConverter.fromYarn(taId),
task.getCounters());
task.getCounters(), task.launchTime);
return taskFailedEvent;
}
@ -861,7 +865,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
taskInfo.getFinishTime(), taskInfo.getTaskType(),
taskInfo.getError(), taskInfo.getTaskStatus(),
taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters(),
launchTime);
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
eventHandler.handle(
new JobTaskEvent(taskId, getExternalState(taskState)));

View File

@ -58,7 +58,7 @@ public class TestEvents {
Counters counters = new Counters();
TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
counters);
counters, 234);
assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
assertEquals(test.getCounters(), counters);
@ -69,7 +69,7 @@ public class TestEvents {
assertEquals(test.getTaskId(), tid);
assertEquals(test.getTaskStatus(), "TEST");
assertEquals(test.getTaskType(), TaskType.REDUCE);
assertEquals(234, test.getStartTime());
}
/**

View File

@ -146,7 +146,7 @@ public class TestJobHistoryEventHandler {
// First completion event, but min-queue-size for batching flushes is 10
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
verify(mockWriter).flush();
} finally {
@ -182,7 +182,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
@ -227,7 +227,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
@ -270,7 +270,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));

View File

@ -32,9 +32,10 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful completion of a map attempt
* Event to record successful completion of a map attempt.
*
*/
@InterfaceAudience.Private
@ -58,9 +59,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
private long startTime;
/**
* Create an event for successful completion of map attempts
* Create an event for successful completion of map attempts.
* @param id Task Attempt ID
* @param taskType Type of the task
* @param taskStatus Status of the task
@ -77,12 +79,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* virtual memory and physical memory.
*
* If you have no splits data, code {@code null} for this
* parameter.
* parameter.
* @param startTs Task start time to be used for writing entity to ATSv2.
*/
public MapAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname, int port,
String rackName, String state, Counters counters, int[][] allSplits) {
public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long mapFinishTime, long finishTime, String hostname,
int port, String rackName, String state, Counters counters,
int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@ -98,6 +101,16 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
this.startTime = startTs;
}
public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long mapFinishTime, long finishTime, String hostname,
int port, String rackName, String state, Counters counters,
int[][] allSplits) {
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, port,
rackName, state, counters, allSplits,
SystemClock.getInstance().getTime());
}
/**
@ -117,15 +130,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* @param counters Counters for the attempt
*/
@Deprecated
public MapAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long mapFinishTime, long finishTime, String hostname,
String state, Counters counters) {
public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long mapFinishTime, long finishTime, String hostname,
String state, Counters counters) {
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, -1, "",
state, counters, null);
}
MapAttemptFinishedEvent() {}
public Object getDatum() {
@ -175,38 +186,56 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
/** Get the task ID */
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the attempt id */
/** Gets the task ID. */
public TaskID getTaskId() {
return attemptId.getTaskID();
}
/** Gets the attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
/** Get the task type */
/** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get the task status */
/** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the map phase finish time */
/** Gets the map phase finish time. */
public long getMapFinishTime() { return mapFinishTime; }
/** Get the attempt finish time */
/** Gets the attempt finish time. */
public long getFinishTime() { return finishTime; }
/** Get the host name */
/**
* Gets the task attempt start time.
* @return task attempt start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the host name. */
public String getHostname() { return hostname.toString(); }
/** Get the tracker rpc port */
/** Gets the tracker rpc port. */
public int getPort() { return port; }
/** Get the rack name */
/** Gets the rack name. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
public String getState() { return state.toString(); }
/** Get the counters */
Counters getCounters() { return counters; }
/** Get the event type */
/**
* Gets the attempt state string.
* @return map attempt state
*/
public String getState() {
return state.toString();
}
/**
* Gets the counters.
* @return counters
*/
Counters getCounters() {
return counters;
}
/** Gets the event type. */
public EventType getEventType() {
return EventType.MAP_ATTEMPT_FINISHED;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful completion of a reduce attempt
@ -59,6 +60,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
private long startTime;
/**
* Create an event to record completion of a reduce attempt
@ -76,13 +78,13 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param allSplits the "splits", or a pixelated graph of various
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
* virtual memory and physical memory.
* virtual memory and physical memory.
* @param startTs Task start time to be used for writing entity to ATSv2.
*/
public ReduceAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long shuffleFinishTime, long sortFinishTime, long finishTime,
String hostname, int port, String rackName, String state,
Counters counters, int[][] allSplits) {
public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long shuffleFinishTime, long sortFinishTime,
long finishTime, String hostname, int port, String rackName,
String state, Counters counters, int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@ -99,6 +101,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
this.startTime = startTs;
}
public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long shuffleFinishTime, long sortFinishTime,
long finishTime, String hostname, int port, String rackName,
String state, Counters counters, int[][] allSplits) {
this(id, taskType, taskStatus, shuffleFinishTime, sortFinishTime,
finishTime, hostname, port, rackName, state, counters, allSplits,
SystemClock.getInstance().getTime());
}
/**
@ -118,13 +130,12 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param state State of the attempt
* @param counters Counters for the attempt
*/
public ReduceAttemptFinishedEvent
(TaskAttemptID id, TaskType taskType, String taskStatus,
long shuffleFinishTime, long sortFinishTime, long finishTime,
String hostname, String state, Counters counters) {
public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long shuffleFinishTime, long sortFinishTime,
long finishTime, String hostname, String state, Counters counters) {
this(id, taskType, taskStatus,
shuffleFinishTime, sortFinishTime, finishTime,
hostname, -1, "", state, counters, null);
shuffleFinishTime, sortFinishTime, finishTime,
hostname, -1, "", state, counters, null);
}
ReduceAttemptFinishedEvent() {}
@ -178,39 +189,55 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
/** Get the Task ID */
/** Gets the Task ID. */
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the attempt id */
/** Gets the attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
/** Get the task type */
/** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get the task status */
/** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the finish time of the sort phase */
/** Gets the finish time of the sort phase. */
public long getSortFinishTime() { return sortFinishTime; }
/** Get the finish time of the shuffle phase */
/** Gets the finish time of the shuffle phase. */
public long getShuffleFinishTime() { return shuffleFinishTime; }
/** Get the finish time of the attempt */
/** Gets the finish time of the attempt. */
public long getFinishTime() { return finishTime; }
/** Get the name of the host where the attempt ran */
/**
* Gets the start time.
* @return task attempt start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the name of the host where the attempt ran. */
public String getHostname() { return hostname.toString(); }
/** Get the tracker rpc port */
/** Gets the tracker rpc port. */
public int getPort() { return port; }
/** Get the rack name of the node where the attempt ran */
/** Gets the rack name of the node where the attempt ran. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
public String getState() { return state.toString(); }
/** Get the counters for the attempt */
Counters getCounters() { return counters; }
/** Get the event type */
/**
* Gets the state string.
* @return reduce attempt state
*/
public String getState() {
return state.toString();
}
/**
* Gets the counters.
* @return counters
*/
Counters getCounters() {
return counters;
}
/** Gets the event type. */
public EventType getEventType() {
return EventType.REDUCE_ATTEMPT_FINISHED;
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful task completion
@ -50,10 +51,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private String hostname;
private String state;
private Counters counters;
private long startTime;
/**
* Create an event to record successful finishes for setup and cleanup
* attempts
* Create an event to record successful finishes for setup and cleanup
* attempts.
* @param id Attempt ID
* @param taskType Type of task
* @param taskStatus Status of task
@ -61,11 +63,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
* @param hostname Host where the attempt executed
* @param state State string
* @param counters Counters for the attempt
* @param startTs Task start time to be used for writing entity to ATSv2.
*/
public TaskAttemptFinishedEvent(TaskAttemptID id,
TaskType taskType, String taskStatus,
long finishTime, String rackName,
String hostname, String state, Counters counters) {
String hostname, String state, Counters counters, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@ -74,6 +77,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.hostname = hostname;
this.state = state;
this.counters = counters;
this.startTime = startTs;
}
public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
String taskStatus, long finishTime, String rackName, String hostname,
String state, Counters counters) {
this(id, taskType, taskStatus, finishTime, rackName, hostname, state,
counters, SystemClock.getInstance().getTime());
}
TaskAttemptFinishedEvent() {}
@ -107,33 +118,43 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.counters = EventReader.fromAvro(datum.getCounters());
}
/** Get the task ID */
/** Gets the task ID. */
public TaskID getTaskId() { return attemptId.getTaskID(); }
/** Get the task attempt id */
/** Gets the task attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
/** Get the task type */
/** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get the task status */
/** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
/** Get the attempt finish time */
/** Gets the attempt finish time. */
public long getFinishTime() { return finishTime; }
/** Get the host where the attempt executed */
/**
* Gets the task attempt start time to be used while publishing to ATSv2.
* @return task attempt start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the host where the attempt executed. */
public String getHostname() { return hostname.toString(); }
/** Get the rackname where the attempt executed */
/** Gets the rackname where the attempt executed. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
/** Get the state string */
/**
* Gets the state string.
* @return task attempt state.
*/
public String getState() { return state.toString(); }
/** Get the counters for the attempt */
/** Gets the counters for the attempt. */
Counters getCounters() { return counters; }
/** Get the event type */
/** Gets the event type. */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record unsuccessful (Killed/Failed) completion of task attempts
@ -58,10 +59,11 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
private long startTime;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
* Create an event to record the unsuccessful completion of attempts
/**
* Create an event to record the unsuccessful completion of attempts.
* @param id Attempt ID
* @param taskType Type of the task
* @param status Status of the attempt
@ -75,12 +77,13 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
* virtual memory and physical memory.
* @param startTs Task start time to be used for writing entity to ATSv2.
*/
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String rackName,
String error, Counters counters, int[][] allSplits) {
String error, Counters counters, int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.status = status;
@ -99,6 +102,15 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes =
ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
this.startTime = startTs;
}
public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id,
TaskType taskType, String status, long finishTime, String hostname,
int port, String rackName, String error, Counters counters,
int[][] allSplits) {
this(id, taskType, status, finishTime, hostname, port, rackName, error,
counters, allSplits, SystemClock.getInstance().getTime());
}
/**
@ -190,39 +202,49 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
/** Get the task id */
/** Gets the task id. */
public TaskID getTaskId() {
return attemptId.getTaskID();
}
/** Get the task type */
/** Gets the task type. */
public TaskType getTaskType() {
return TaskType.valueOf(taskType.toString());
}
/** Get the attempt id */
/** Gets the attempt id. */
public TaskAttemptID getTaskAttemptId() {
return attemptId;
}
/** Get the finish time */
/** Gets the finish time. */
public long getFinishTime() { return finishTime; }
/** Get the name of the host where the attempt executed */
/**
* Gets the task attempt start time to be used while publishing to ATSv2.
* @return task attempt start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the name of the host where the attempt executed. */
public String getHostname() { return hostname; }
/** Get the rpc port for the host where the attempt executed */
/** Gets the rpc port for the host where the attempt executed. */
public int getPort() { return port; }
/** Get the rack name of the node where the attempt ran */
/** Gets the rack name of the node where the attempt ran. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
/** Get the error string */
/** Gets the error string. */
public String getError() { return error.toString(); }
/** Get the task status */
/**
* Gets the task attempt status.
* @return task attempt status.
*/
public String getTaskStatus() {
return status.toString();
}
/** Get the counters */
/** Gets the counters. */
Counters getCounters() { return counters; }
/** Get the event type */
/** Gets the event type. */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record the failure of a task
@ -49,11 +50,12 @@ public class TaskFailedEvent implements HistoryEvent {
private String status;
private String error;
private Counters counters;
private long startTime;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
* Create an event to record task failure
* Create an event to record task failure.
* @param id Task ID
* @param finishTime Finish time of the task
* @param taskType Type of the task
@ -61,10 +63,11 @@ public class TaskFailedEvent implements HistoryEvent {
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
* @param counters Counters for the task
* @param startTs task start time.
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt, Counters counters) {
TaskAttemptID failedDueToAttempt, Counters counters, long startTs) {
this.id = id;
this.finishTime = finishTime;
this.taskType = taskType;
@ -72,15 +75,23 @@ public class TaskFailedEvent implements HistoryEvent {
this.status = status;
this.failedDueToAttempt = failedDueToAttempt;
this.counters = counters;
this.startTime = startTs;
}
public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType,
String error, String status, TaskAttemptID failedDueToAttempt,
Counters counters) {
this(id, finishTime, taskType, error, status, failedDueToAttempt, counters,
SystemClock.getInstance().getTime());
}
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt) {
this(id, finishTime, taskType, error, status,
failedDueToAttempt, EMPTY_COUNTERS);
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt) {
this(id, finishTime, taskType, error, status, failedDueToAttempt,
EMPTY_COUNTERS);
}
TaskFailedEvent() {}
public Object getDatum() {
@ -118,27 +129,37 @@ public class TaskFailedEvent implements HistoryEvent {
EventReader.fromAvro(datum.getCounters());
}
/** Get the task id */
/** Gets the task id. */
public TaskID getTaskId() { return id; }
/** Get the error string */
/** Gets the error string. */
public String getError() { return error; }
/** Get the finish time of the attempt */
/** Gets the finish time of the attempt. */
public long getFinishTime() {
return finishTime;
}
/** Get the task type */
/**
* Gets the task start time to be reported to ATSv2.
* @return task start time.
*/
public long getStartTime() {
return startTime;
}
/** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get the attempt id due to which the task failed */
/** Gets the attempt id due to which the task failed. */
public TaskAttemptID getFailedAttemptID() {
return failedDueToAttempt;
}
/** Get the task status */
/**
* Gets the task status.
* @return task status
*/
public String getTaskStatus() { return status; }
/** Get task counters */
/** Gets task counters. */
public Counters getCounters() { return counters; }
/** Get the event type */
/** Gets the event type. */
public EventType getEventType() {
return EventType.TASK_FAILED;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record the successful completion of a task
@ -49,27 +50,36 @@ public class TaskFinishedEvent implements HistoryEvent {
private TaskType taskType;
private String status;
private Counters counters;
private long startTime;
/**
* Create an event to record the successful completion of a task
* Create an event to record the successful completion of a task.
* @param id Task ID
* @param attemptId Task Attempt ID of the successful attempt for this task
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param status Status string
* @param counters Counters for the task
* @param startTs task start time
*/
public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
TaskType taskType,
String status, Counters counters) {
String status, Counters counters, long startTs) {
this.taskid = id;
this.successfulAttemptId = attemptId;
this.finishTime = finishTime;
this.taskType = taskType;
this.status = status;
this.counters = counters;
this.startTime = startTs;
}
public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
TaskType taskType, String status, Counters counters) {
this(id, attemptId, finishTime, taskType, status, counters,
SystemClock.getInstance().getTime());
}
TaskFinishedEvent() {}
public Object getDatum() {
@ -101,23 +111,33 @@ public class TaskFinishedEvent implements HistoryEvent {
this.counters = EventReader.fromAvro(datum.getCounters());
}
/** Get task id */
/** Gets task id. */
public TaskID getTaskId() { return taskid; }
/** Get successful task attempt id */
/** Gets successful task attempt id. */
public TaskAttemptID getSuccessfulTaskAttemptId() {
return successfulAttemptId;
}
/** Get the task finish time */
/** Gets the task finish time. */
public long getFinishTime() { return finishTime; }
/** Get task counters */
/**
* Gets the task start time to be reported to ATSv2.
* @return task start time
*/
public long getStartTime() {
return startTime;
}
/** Gets task counters. */
public Counters getCounters() { return counters; }
/** Get task type */
/** Gets task type. */
public TaskType getTaskType() {
return taskType;
}
/** Get task status */
/**
* Gets task status.
* @return task status
*/
public String getTaskStatus() { return status.toString(); }
/** Get event type */
/** Gets event type. */
public EventType getEventType() {
return EventType.TASK_FINISHED;
}

View File

@ -298,10 +298,10 @@ public class TestMRTimelineEventHandling {
" does not exist.",
jobEventFile.exists());
verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(),
true, false, null);
true, false, null, false);
Set<String> cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2",
"huge_dummy_conf1", "huge_dummy_conf2");
verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
verifyEntity(jobEventFile, null, false, true, cfgsToCheck, false);
// for this test, we expect MR job metrics are published in YARN_APPLICATION
String outputAppDir =
@ -322,8 +322,8 @@ public class TestMRTimelineEventHandling {
"appEventFilePath: " + appEventFilePath +
" does not exist.",
appEventFile.exists());
verifyEntity(appEventFile, null, true, false, null);
verifyEntity(appEventFile, null, false, true, cfgsToCheck);
verifyEntity(appEventFile, null, true, false, null, false);
verifyEntity(appEventFile, null, false, true, cfgsToCheck, false);
// check for task event file
String outputDirTask =
@ -344,7 +344,7 @@ public class TestMRTimelineEventHandling {
" does not exist.",
taskEventFile.exists());
verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
true, false, null);
true, false, null, true);
// check for task attempt event file
String outputDirTaskAttempt =
@ -363,7 +363,7 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
" does not exist.", taskAttemptEventFile.exists());
verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(),
true, false, null);
true, false, null, true);
}
/**
@ -380,12 +380,13 @@ public class TestMRTimelineEventHandling {
* @throws IOException
*/
private void verifyEntity(File entityFile, String eventId,
boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify)
throws IOException {
boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify,
boolean checkIdPrefix) throws IOException {
BufferedReader reader = null;
String strLine;
try {
reader = new BufferedReader(new FileReader(entityFile));
long idPrefix = -1;
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
@ -394,6 +395,19 @@ public class TestMRTimelineEventHandling {
strLine.trim(),
org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity.class);
LOG.info("strLine.trim()= " + strLine.trim());
if (checkIdPrefix) {
Assert.assertTrue("Entity ID prefix expected to be > 0" ,
entity.getIdPrefix() > 0);
if (idPrefix == -1) {
idPrefix = entity.getIdPrefix();
} else {
Assert.assertEquals("Entity ID prefix should be same across " +
"each publish of same entity",
idPrefix, entity.getIdPrefix());
}
}
if (eventId == null) {
// Job metrics are published without any events for
// ApplicationEntity. There is also possibility that some other

View File

@ -103,6 +103,8 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager;
@ -306,6 +308,17 @@ public class ApplicationMaster {
protected final Set<ContainerId> launchedContainers =
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
/**
* Container start times used to set id prefix while publishing entity
* to ATSv2.
*/
private final ConcurrentMap<ContainerId, Long> containerStartTimes =
new ConcurrentHashMap<ContainerId, Long>();
private ConcurrentMap<ContainerId, Long> getContainerStartTimes() {
return containerStartTimes;
}
/**
* @param args Command line args
*/
@ -855,7 +868,15 @@ public class ApplicationMaster {
}
if(timelineClient != null) {
if (timelineServiceV2) {
publishContainerEndEventOnTimelineServiceV2(containerStatus);
Long containerStartTime =
containerStartTimes.get(containerStatus.getContainerId());
if (containerStartTime == null) {
containerStartTime = SystemClock.getInstance().getTime();
containerStartTimes.put(containerStatus.getContainerId(),
containerStartTime);
}
publishContainerEndEventOnTimelineServiceV2(containerStatus,
containerStartTime);
} else {
publishContainerEndEvent(
timelineClient, containerStatus, domainId, appSubmitterUgi);
@ -985,8 +1006,11 @@ public class ApplicationMaster {
}
if(applicationMaster.timelineClient != null) {
if (applicationMaster.timelineServiceV2) {
long startTime = SystemClock.getInstance().getTime();
applicationMaster.getContainerStartTimes().put(
containerId, startTime);
applicationMaster.publishContainerStartEventOnTimelineServiceV2(
container);
container, startTime);
} else {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
@ -1359,24 +1383,24 @@ public class ApplicationMaster {
}
private void publishContainerStartEventOnTimelineServiceV2(
Container container) {
Container container, long startTime) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity =
new org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity();
entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
long ts = System.currentTimeMillis();
entity.setCreatedTime(ts);
entity.setCreatedTime(startTime);
entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(ts);
event.setTimestamp(startTime);
event.setId(DSEvent.DS_CONTAINER_START.toString());
event.addInfo("Node", container.getNodeId().toString());
event.addInfo("Resources", container.getResource().toString());
entity.addEvent(event);
entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@ -1394,7 +1418,7 @@ public class ApplicationMaster {
}
private void publishContainerEndEventOnTimelineServiceV2(
final ContainerStatus container) {
final ContainerStatus container, long containerStartTime) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity =
new org.apache.hadoop.yarn.api.records.timelineservice.
@ -1410,6 +1434,7 @@ public class ApplicationMaster {
event.addInfo("State", container.getState().name());
event.addInfo("Exit Status", container.getExitStatus());
entity.addEvent(event);
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@ -1444,6 +1469,8 @@ public class ApplicationMaster {
event.setId(appEvent.toString());
event.setTimestamp(ts);
entity.addEvent(event);
entity.setIdPrefix(
TimelineServiceHelper.invertLong(appAttemptID.getAttemptId()));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {

View File

@ -64,7 +64,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@ -523,15 +526,31 @@ public class TestDistributedShell {
"appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_000001"
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
appTimestampFileName);
File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
"DS_APP_ATTEMPT", appTimestampFileName);
// Check if required events are published and same idprefix is sent for
// on each publish.
verifyEntityForTimelineV2(dsAppAttemptEntityFile,
DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
// to avoid race condition of testcase, atleast check 40 times with sleep
// of 50ms
verifyEntityForTimelineV2(dsAppAttemptEntityFile,
DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
// Verify DS_CONTAINER entities posted by the client
// Verify DS_CONTAINER entities posted by the client.
String containerTimestampFileName =
"container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000002.thist";
verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
containerTimestampFileName);
File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
"DS_CONTAINER", containerTimestampFileName);
// Check if required events are published and same idprefix is sent for
// on each publish.
verifyEntityForTimelineV2(dsContainerEntityFile,
DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
// to avoid race condition of testcase, atleast check 40 times with sleep
// of 50ms
verifyEntityForTimelineV2(dsContainerEntityFile,
DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
// Verify NM posting container metrics info.
String containerMetricsTimestampFileName =
@ -541,29 +560,13 @@ public class TestDistributedShell {
File containerEntityFile = verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_CONTAINER.toString(),
containerMetricsTimestampFileName);
Assert.assertEquals(
"Container created event needs to be published atleast once",
1,
getNumOfStringOccurences(containerEntityFile,
ContainerMetricsConstants.CREATED_EVENT_TYPE));
verifyEntityForTimelineV2(containerEntityFile,
ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
// to avoid race condition of testcase, atleast check 4 times with sleep
// of 500ms
long numOfContainerFinishedOccurences = 0;
for (int i = 0; i < 4; i++) {
numOfContainerFinishedOccurences =
getNumOfStringOccurences(containerEntityFile,
ContainerMetricsConstants.FINISHED_EVENT_TYPE);
if (numOfContainerFinishedOccurences > 0) {
break;
} else {
Thread.sleep(500L);
}
}
Assert.assertEquals(
"Container finished event needs to be published atleast once",
1,
numOfContainerFinishedOccurences);
// to avoid race condition of testcase, atleast check 40 times with sleep
// of 50ms
verifyEntityForTimelineV2(containerEntityFile,
ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
// Verify RM posting Application life cycle Events are getting published
String appMetricsTimestampFileName =
@ -573,29 +576,14 @@ public class TestDistributedShell {
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION.toString(),
appMetricsTimestampFileName);
Assert.assertEquals(
"Application created event should be published atleast once",
1,
getNumOfStringOccurences(appEntityFile,
ApplicationMetricsConstants.CREATED_EVENT_TYPE));
// No need to check idprefix for app.
verifyEntityForTimelineV2(appEntityFile,
ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
// to avoid race condition of testcase, atleast check 4 times with sleep
// of 500ms
long numOfStringOccurences = 0;
for (int i = 0; i < 4; i++) {
numOfStringOccurences =
getNumOfStringOccurences(appEntityFile,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
if (numOfStringOccurences > 0) {
break;
} else {
Thread.sleep(500L);
}
}
Assert.assertEquals(
"Application finished event should be published atleast once",
1,
numOfStringOccurences);
// to avoid race condition of testcase, atleast check 40 times with sleep
// of 50ms
verifyEntityForTimelineV2(appEntityFile,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
// Verify RM posting AppAttempt life cycle Events are getting published
String appAttemptMetricsTimestampFileName =
@ -606,17 +594,10 @@ public class TestDistributedShell {
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
appAttemptMetricsTimestampFileName);
Assert.assertEquals(
"AppAttempt register event should be published atleast once",
1,
getNumOfStringOccurences(appAttemptEntityFile,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE));
Assert.assertEquals(
"AppAttempt finished event should be published atleast once",
1,
getNumOfStringOccurences(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE));
verifyEntityForTimelineV2(appAttemptEntityFile,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
verifyEntityForTimelineV2(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
} finally {
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
}
@ -636,22 +617,64 @@ public class TestDistributedShell {
return entityFile;
}
private long getNumOfStringOccurences(File entityFile, String searchString)
throws IOException {
BufferedReader reader = null;
String strLine;
/**
* Checks the events and idprefix published for an entity.
*
* @param entityFile Entity file.
* @param expectedEvent Expected event Id.
* @param numOfExpectedEvent Number of expected occurences of expected event
* id.
* @param checkTimes Number of times to check.
* @param sleepTime Sleep time for each iteration.
* @param checkIdPrefix Whether to check idprefix.
* @throws IOException if entity file reading fails.
* @throws InterruptedException if sleep is interrupted.
*/
private void verifyEntityForTimelineV2(File entityFile, String expectedEvent,
long numOfExpectedEvent, int checkTimes, long sleepTime,
boolean checkIdPrefix) throws IOException, InterruptedException {
long actualCount = 0;
try {
reader = new BufferedReader(new FileReader(entityFile));
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().contains(searchString)) {
actualCount++;
for (int i = 0; i < checkTimes; i++) {
BufferedReader reader = null;
String strLine = null;
actualCount = 0;
try {
reader = new BufferedReader(new FileReader(entityFile));
long idPrefix = -1;
while ((strLine = reader.readLine()) != null) {
String entityLine = strLine.trim();
if (entityLine.isEmpty()) {
continue;
}
if (entityLine.contains(expectedEvent)) {
actualCount++;
}
if (checkIdPrefix) {
TimelineEntity entity = FileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
Assert.assertTrue("Entity ID prefix expected to be > 0" ,
entity.getIdPrefix() > 0);
if (idPrefix == -1) {
idPrefix = entity.getIdPrefix();
} else {
Assert.assertEquals("Entity ID prefix should be same across " +
"each publish of same entity",
idPrefix, entity.getIdPrefix());
}
}
}
} finally {
reader.close();
}
if (numOfExpectedEvent == actualCount) {
break;
}
if (sleepTime > 0 && i < checkTimes - 1) {
Thread.sleep(sleepTime);
}
} finally {
reader.close();
}
return actualCount;
Assert.assertEquals("Unexpected number of " + expectedEvent +
" event published.", numOfExpectedEvent, actualCount);
}
/**

View File

@ -155,6 +155,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@ -1048,10 +1049,11 @@ public class ContainerManagerImpl extends CompositeService implements
Credentials credentials =
YarnServerSecurityUtils.parseCredentials(launchContext);
long containerStartTime = SystemClock.getInstance().getTime();
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
launchContext, credentials, metrics, containerTokenIdentifier,
context);
context, containerStartTime);
ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerId, container) != null) {
@ -1104,7 +1106,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
this.context.getNMStateStore().storeContainer(containerId,
containerTokenIdentifier.getVersion(), request);
containerTokenIdentifier.getVersion(), containerStartTime, request);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));

View File

@ -23,12 +23,16 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
public class ApplicationContainerFinishedEvent extends ApplicationEvent {
private ContainerStatus containerStatus;
// Required by NMTimelinePublisher.
private long containerStartTime;
public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) {
public ApplicationContainerFinishedEvent(ContainerStatus containerStatus,
long containerStartTs) {
super(containerStatus.getContainerId().getApplicationAttemptId().
getApplicationId(),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
this.containerStatus = containerStatus;
this.containerStartTime = containerStartTs;
}
public ContainerId getContainerID() {
@ -39,4 +43,7 @@ public class ApplicationContainerFinishedEvent extends ApplicationEvent {
return containerStatus;
}
public long getContainerStartTime() {
return containerStartTime;
}
}

View File

@ -37,6 +37,8 @@ public interface Container extends EventHandler<ContainerEvent> {
ContainerId getContainerId();
long getContainerStartTime();
Resource getResource();
ContainerTokenIdentifier getContainerTokenIdentifier();

View File

@ -176,11 +176,11 @@ public class ContainerImpl implements Container {
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
private final long startTime;
private static final Logger LOG =
LoggerFactory.getLogger(ContainerImpl.class);
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
RecoveredContainerStatus.REQUESTED;
@ -193,6 +193,16 @@ public class ContainerImpl implements Container {
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, Context context) {
this(conf, dispatcher, launchContext, creds, metrics,
containerTokenIdentifier, context, SystemClock.getInstance().getTime());
}
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, Context context,
long startTs) {
this.startTime = startTs;
this.daemonConf = conf;
this.dispatcher = dispatcher;
this.stateStore = context.getNMStateStore();
@ -266,7 +276,7 @@ public class ContainerImpl implements Container {
ContainerTokenIdentifier containerTokenIdentifier, Context context,
RecoveredContainerState rcs) {
this(conf, dispatcher, launchContext, creds, metrics,
containerTokenIdentifier, context);
containerTokenIdentifier, context, rcs.getStartTime());
this.recoveredStatus = rcs.getStatus();
this.exitCode = rcs.getExitCode();
this.recoveredAsKilled = rcs.getKilled();
@ -846,6 +856,11 @@ public class ContainerImpl implements Container {
return this.containerId;
}
@Override
public long getContainerStartTime() {
return this.startTime;
}
@Override
public Resource getResource() {
return Resources.clone(
@ -909,11 +924,12 @@ public class ContainerImpl implements Container {
@SuppressWarnings("rawtypes")
EventHandler eventHandler = dispatcher.getEventHandler();
ContainerStatus containerStatus = cloneAndGetContainerStatus();
eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
// Tell the scheduler the container is Done
eventHandler.handle(new ContainerSchedulerEvent(this,
ContainerSchedulerEventType.CONTAINER_COMPLETED));
ContainerStatus containerStatus = cloneAndGetContainerStatus();
eventHandler.handle(
new ApplicationContainerFinishedEvent(containerStatus, startTime));
// Remove the container from the resource-monitor
eventHandler.handle(new ContainerStopMonitoringEvent(containerId));

View File

@ -114,6 +114,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
"ContainerManager/containers/";
private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
private static final String CONTAINER_VERSION_KEY_SUFFIX = "/version";
private static final String CONTAINER_START_TIME_KEY_SUFFIX = "/starttime";
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
@ -260,6 +261,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
StartContainerRequestProto.parseFrom(entry.getValue()));
} else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) {
rcs.version = Integer.parseInt(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) {
rcs.setStartTime(Long.parseLong(asString(entry.getValue())));
} else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
rcs.diagnostics = asString(entry.getValue());
} else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) {
@ -314,21 +317,23 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
@Override
public void storeContainer(ContainerId containerId, int containerVersion,
StartContainerRequest startRequest) throws IOException {
long startTime, StartContainerRequest startRequest) throws IOException {
String idStr = containerId.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("storeContainer: containerId= " + idStr
+ ", startRequest= " + startRequest);
}
String keyRequest = CONTAINERS_KEY_PREFIX + idStr
+ CONTAINER_REQUEST_KEY_SUFFIX;
String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
String keyVersion = getContainerVersionKey(idStr);
String keyStartTime =
getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX);
try {
WriteBatch batch = db.createWriteBatch();
try {
batch.put(bytes(keyRequest),
((StartContainerRequestPBImpl) startRequest)
.getProto().toByteArray());
((StartContainerRequestPBImpl) startRequest).getProto().
toByteArray());
batch.put(bytes(keyStartTime), bytes(Long.toString(startTime)));
if (containerVersion != 0) {
batch.put(bytes(keyVersion),
bytes(Integer.toString(containerVersion)));
@ -344,7 +349,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
@VisibleForTesting
String getContainerVersionKey(String containerId) {
return CONTAINERS_KEY_PREFIX + containerId + CONTAINER_VERSION_KEY_SUFFIX;
return getContainerKey(containerId, CONTAINER_VERSION_KEY_SUFFIX);
}
private String getContainerKey(String containerId, String suffix) {
return CONTAINERS_KEY_PREFIX + containerId + suffix;
}
@Override

View File

@ -71,7 +71,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
@Override
public void storeContainer(ContainerId containerId, int version,
StartContainerRequest startRequest) throws IOException {
long startTime, StartContainerRequest startRequest) throws IOException {
}
@Override

View File

@ -89,6 +89,7 @@ public abstract class NMStateStoreService extends AbstractService {
int version;
private RecoveredContainerType recoveryType =
RecoveredContainerType.RECOVER;
private long startTime;
public RecoveredContainerStatus getStatus() {
return status;
@ -110,6 +111,14 @@ public abstract class NMStateStoreService extends AbstractService {
return version;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long ts) {
startTime = ts;
}
public StartContainerRequest getStartRequest() {
return startRequest;
}
@ -147,6 +156,7 @@ public abstract class NMStateStoreService extends AbstractService {
return new StringBuffer("Status: ").append(getStatus())
.append(", Exit code: ").append(exitCode)
.append(", Version: ").append(version)
.append(", Start Time: ").append(startTime)
.append(", Killed: ").append(getKilled())
.append(", Diagnostics: ").append(getDiagnostics())
.append(", Capability: ").append(getCapability())
@ -367,11 +377,12 @@ public abstract class NMStateStoreService extends AbstractService {
* Record a container start request
* @param containerId the container ID
* @param containerVersion the container Version
* @param startTime container start time
* @param startRequest the container start request
* @throws IOException
*/
public abstract void storeContainer(ContainerId containerId,
int containerVersion, StartContainerRequest startRequest)
int containerVersion, long startTime, StartContainerRequest startRequest)
throws IOException;
/**

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
@ -140,6 +141,8 @@ public class NMTimelinePublisher extends CompositeService {
Math.round(cpuUsagePercentPerCore));
entity.addMetric(cpuMetric);
}
entity.setIdPrefix(TimelineServiceHelper.
invertLong(container.getContainerStartTime()));
ApplicationId appId = container.getContainerId().getApplicationAttemptId()
.getApplicationId();
try {
@ -186,15 +189,17 @@ public class NMTimelinePublisher extends CompositeService {
tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
long containerStartTime = container.getContainerStartTime();
entity.addEvent(tEvent);
entity.setCreatedTime(event.getTimestamp());
entity.setCreatedTime(containerStartTime);
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
@SuppressWarnings("unchecked")
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
long timeStamp) {
long containerFinishTime, long containerStartTime) {
ContainerId containerId = containerStatus.getContainerId();
TimelineEntity entity = createContainerEntity(containerId);
@ -206,13 +211,14 @@ public class NMTimelinePublisher extends CompositeService {
entityInfo.put(ContainerMetricsConstants.STATE_INFO,
ContainerState.COMPLETE.toString());
entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
timeStamp);
containerFinishTime);
entity.setInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(timeStamp);
tEvent.setTimestamp(containerFinishTime);
entity.addEvent(tEvent);
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
@ -228,6 +234,8 @@ public class NMTimelinePublisher extends CompositeService {
tEvent.setId(eventType);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
entity.setIdPrefix(TimelineServiceHelper.
invertLong(container.getContainerStartTime()));
ApplicationId appId =
container.getContainerId().getApplicationAttemptId().getApplicationId();
@ -291,7 +299,7 @@ public class NMTimelinePublisher extends CompositeService {
ApplicationContainerFinishedEvent evnt =
(ApplicationContainerFinishedEvent) event;
publishContainerFinishedEvent(evnt.getContainerStatus(),
event.getTimestamp());
event.getTimestamp(), evnt.getContainerStartTime());
break;
default:

View File

@ -601,7 +601,7 @@ public class TestApplication {
public void containerFinished(int containerNum) {
app.handle(new ApplicationContainerFinishedEvent(containers.get(
containerNum).cloneAndGetContainerStatus()));
containerNum).cloneAndGetContainerStatus(), 0));
drainDispatcherEvents();
}

View File

@ -127,10 +127,12 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
public synchronized void storeContainer(ContainerId containerId,
int version, StartContainerRequest startRequest) throws IOException {
int version, long startTime, StartContainerRequest startRequest)
throws IOException {
RecoveredContainerState rcs = new RecoveredContainerState();
rcs.startRequest = startRequest;
rcs.version = version;
rcs.setStartTime(startTime);
containerStates.put(containerId, rcs);
}

View File

@ -234,7 +234,8 @@ public class TestNMLeveldbStateStoreService {
StartContainerRequest containerReq = createContainerRequest(containerId);
// store a container and verify recovered
stateStore.storeContainer(containerId, 0, containerReq);
long containerStartTime = System.currentTimeMillis();
stateStore.storeContainer(containerId, 0, containerStartTime, containerReq);
// verify the container version key is not stored for new containers
DB db = stateStore.getDB();
@ -246,6 +247,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(containerStartTime, rcs.getStartTime());
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
@ -1021,7 +1023,7 @@ public class TestNMLeveldbStateStoreService {
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
containerToken);
stateStore.storeContainer(containerId, 0, containerReq);
stateStore.storeContainer(containerId, 0, 0, containerReq);
// add a invalid key
byte[] invalidKey = ("ContainerManager/containers/"

View File

@ -236,6 +236,10 @@ public class MockContainer implements Container {
@Override
public void sendPauseEvent(String description) {
}
@Override
public long getContainerStartTime() {
return 0;
}
}

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
@ -287,8 +288,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
@Override
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
TimelineEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
ApplicationAttemptId attemptId = appAttempt.getAppAttemptId();
TimelineEntity entity = createAppAttemptEntity(attemptId);
entity.setCreatedTime(registeredTime);
TimelineEvent tEvent = new TimelineEvent();
@ -310,6 +311,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
appAttempt.getMasterContainer().getId().toString());
}
entity.setInfo(entityInfo);
entity.setIdPrefix(
TimelineServiceHelper.invertLong(attemptId.getAttemptId()));
getDispatcher().getEventHandler().handle(
new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
@ -320,7 +323,7 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
ApplicationAttemptId attemptId = appAttempt.getAppAttemptId();
ApplicationAttemptEntity entity =
createAppAttemptEntity(appAttempt.getAppAttemptId());
@ -339,7 +342,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
entityInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
.createApplicationAttemptState(appAttemtpState).toString());
entity.setInfo(entityInfo);
entity.setIdPrefix(
TimelineServiceHelper.invertLong(attemptId.getAttemptId()));
getDispatcher().getEventHandler().handle(
new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineC
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@ -211,7 +212,8 @@ public class TestSystemMetricsPublisherForV2 {
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 6);
verifyEntity(
appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 6, 0);
}
@Test(timeout = 10000)
@ -246,7 +248,7 @@ public class TestSystemMetricsPublisherForV2 {
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE,
0);
0, TimelineServiceHelper.invertLong(appAttemptId.getAttemptId()));
}
@Test(timeout = 10000)
@ -278,7 +280,7 @@ public class TestSystemMetricsPublisherForV2 {
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile, 2,
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0);
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0, 0);
}
private RMApp createAppAndRegister(ApplicationId appId) {
@ -292,7 +294,8 @@ public class TestSystemMetricsPublisherForV2 {
}
private static void verifyEntity(File entityFile, long expectedEvents,
String eventForCreatedTime, long expectedMetrics) throws IOException {
String eventForCreatedTime, long expectedMetrics, long idPrefix)
throws IOException {
BufferedReader reader = null;
String strLine;
long count = 0;
@ -304,6 +307,7 @@ public class TestSystemMetricsPublisherForV2 {
TimelineEntity entity = FileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
metricsCount = entity.getMetrics().size();
assertEquals(idPrefix, entity.getIdPrefix());
for (TimelineEvent event : entity.getEvents()) {
if (event.getId().equals(eventForCreatedTime)) {
assertTrue(entity.getCreatedTime() > 0);
@ -388,6 +392,7 @@ public class TestSystemMetricsPublisherForV2 {
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
when(appAttempt.getOriginalTrackingUrl()).thenReturn(
"test original tracking url");
when(appAttempt.getStartTime()).thenReturn(200L);
return appAttempt;
}