MAPREDUCE-6424. Store MR counters as timeline metrics instead of event. (Naganarasimha G R via varunsaxena)

This commit is contained in:
Varun Saxena 2016-05-01 17:17:24 +05:30 committed by Sangjin Lee
parent c2055a97d5
commit f0dbd7a40f
25 changed files with 345 additions and 59 deletions

View File

@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
@ -68,6 +69,8 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@ -1072,6 +1075,15 @@ public class JobHistoryEventHandler extends AbstractService
return entity;
}
// create ApplicationEntity with job finished Metrics from HistoryEvent
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createAppEntityWithJobMetrics(HistoryEvent event, JobId jobId) {
ApplicationEntity entity = new ApplicationEntity();
entity.setId(jobId.getAppId().toString());
entity.setMetrics(event.getTimelineMetrics());
return entity;
}
// create BaseEntity from HistoryEvent with adding other info, like:
// timestamp and entityType.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
@ -1088,6 +1100,10 @@ public class JobHistoryEventHandler extends AbstractService
if (setCreatedTime) {
entity.setCreatedTime(timestamp);
}
Set<TimelineMetric> timelineMetrics = event.getTimelineMetrics();
if (timelineMetrics != null) {
entity.setMetrics(timelineMetrics);
}
return entity;
}
@ -1203,10 +1219,17 @@ public class JobHistoryEventHandler extends AbstractService
" and handled by timeline service.");
return;
}
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
appEntityWithJobMetrics = null;
if (taskId == null) {
// JobEntity
tEntity = createJobEntity(event, timestamp, jobId,
MAPREDUCE_JOB_ENTITY_TYPE, setCreatedTime);
if (event.getEventType() == EventType.JOB_FINISHED
&& event.getTimelineMetrics() != null) {
appEntityWithJobMetrics = createAppEntityWithJobMetrics(event, jobId);
}
} else {
if (taskAttemptId == null) {
// TaskEntity
@ -1221,7 +1244,11 @@ public class JobHistoryEventHandler extends AbstractService
}
}
try {
timelineClient.putEntitiesAsync(tEntity);
if (appEntityWithJobMetrics == null) {
timelineClient.putEntitiesAsync(tEntity);
} else {
timelineClient.putEntities(tEntity, appEntityWithJobMetrics);
}
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
+ " for the job : " + jobId, e);

View File

@ -18,13 +18,15 @@
package org.apache.hadoop.mapreduce.jobhistory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import static org.junit.Assert.*;
import java.util.Set;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.junit.Test;
public class TestEvents {
@ -410,6 +413,11 @@ public class TestEvents {
public TimelineEvent toTimelineEvent() {
return null;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,16 +18,18 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.avro.util.Utf8;
/**
* Event to record start of a task attempt
*
@ -183,5 +185,9 @@ public class AMStartedEvent implements HistoryEvent {
tEvent.addInfo("START_TIME", getStartTime());
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Interface for event wrapper classes. Implementations each wrap an
@ -41,4 +44,7 @@ public interface HistoryEvent {
/** Map HistoryEvent to TimelineEvent */
TimelineEvent toTimelineEvent();
/** Counters or Metrics if any else return null. */
Set<TimelineMetric> getTimelineMetrics();
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -26,6 +28,7 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record successful completion of job
@ -148,14 +151,19 @@ public class JobFinishedEvent implements HistoryEvent {
tEvent.addInfo("FAILED_REDUCES", getFailedReduces());
tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
tEvent.addInfo("MAP_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getMapCounters()));
tEvent.addInfo("REDUCE_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getReduceCounters()));
tEvent.addInfo("TOTAL_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getTotalCounters()));
// TODO replace SUCCEEDED with JobState.SUCCEEDED.toString()
tEvent.addInfo("JOB_STATUS", "SUCCEEDED");
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
Set<TimelineMetric> jobMetrics = JobHistoryEventUtils
.countersToTimelineMetric(getMapCounters(), finishTime);
jobMetrics.addAll(JobHistoryEventUtils
.countersToTimelineMetric(getReduceCounters(), finishTime));
jobMetrics.addAll(JobHistoryEventUtils
.countersToTimelineMetric(getTotalCounters(), finishTime));
return jobMetrics;
}
}

View File

@ -18,15 +18,15 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record changes in the submit and launch time of
@ -76,4 +76,8 @@ public class JobInfoChangeEvent implements HistoryEvent {
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,13 +18,15 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record the initialization of a job
@ -87,4 +89,9 @@ public class JobInitedEvent implements HistoryEvent {
tEvent.addInfo("UBERIZED", getUberized());
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,16 +18,16 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record the change of priority of a job
@ -75,4 +75,9 @@ public class JobPriorityChangeEvent implements HistoryEvent {
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@SuppressWarnings("deprecation")
public class JobQueueChangeEvent implements HistoryEvent {
@ -70,4 +73,9 @@ public class JobQueueChangeEvent implements HistoryEvent {
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,15 +18,15 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record the change of status for a job
@ -71,4 +71,9 @@ public class JobStatusChangedEvent implements HistoryEvent {
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.mapreduce.jobhistory;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobACL;
@ -29,8 +31,7 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record the submission of a job
@ -229,4 +230,9 @@ public class JobSubmittedEvent implements HistoryEvent {
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
import com.google.common.base.Joiner;
import java.util.Collections;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
@ -26,8 +27,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import java.util.Collections;
import com.google.common.base.Joiner;
/**
* Event to record Failed and Killed completion of jobs
@ -135,4 +137,9 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -29,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record successful completion of a map attempt
@ -230,8 +233,6 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
tEvent.addInfo("STATUS", getTaskStatus());
tEvent.addInfo("STATE", getState());
tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
tEvent.addInfo("HOSTNAME", getHostname());
tEvent.addInfo("PORT", getPort());
tEvent.addInfo("RACK_NAME", getRackName());
@ -239,5 +240,12 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
"" : getAttemptId().toString());
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
Set<TimelineMetric> metrics = JobHistoryEventUtils
.countersToTimelineMetric(getCounters(), finishTime);
return metrics;
}
}

View File

@ -17,11 +17,14 @@
*/
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record the normalized map/reduce requirements.
@ -82,4 +85,9 @@ public class NormalizedResourceEvent implements HistoryEvent {
tEvent.addInfo("TASK_TYPE", getTaskType());
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -29,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record successful completion of a reduce attempt
@ -238,12 +241,17 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
tEvent.addInfo("STATE", getState());
tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime());
tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
tEvent.addInfo("HOSTNAME", getHostname());
tEvent.addInfo("PORT", getPort());
tEvent.addInfo("RACK_NAME", getRackName());
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
Set<TimelineMetric> metrics = JobHistoryEventUtils
.countersToTimelineMetric(getCounters(), finishTime);
return metrics;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -28,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record successful task completion
@ -149,10 +152,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("STATUS", getTaskStatus());
tEvent.addInfo("STATE", getState());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
tEvent.addInfo("HOSTNAME", getHostname());
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
Set<TimelineMetric> metrics = JobHistoryEventUtils
.countersToTimelineMetric(getCounters(), finishTime);
return metrics;
}
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@ -25,10 +28,9 @@ import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Event to record start of a task attempt
@ -151,4 +153,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,8 +18,12 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@ -28,10 +32,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record unsuccessful (Killed/Failed) completion of task attempts
@ -267,9 +268,13 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime());
tEvent.addInfo("SORT_FINISH_TIME", getFinishTime());
tEvent.addInfo("MAP_FINISH_TIME", getFinishTime());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
Set<TimelineMetric> metrics = JobHistoryEventUtils
.countersToTimelineMetric(getCounters(), finishTime);
return metrics;
}
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.TaskStatus;
@ -28,8 +31,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record the failure of a task
@ -151,9 +153,13 @@ public class TaskFailedEvent implements HistoryEvent {
tEvent.addInfo("ERROR", getError());
tEvent.addInfo("FAILED_ATTEMPT_ID",
getFailedAttemptID() == null ? "" : getFailedAttemptID().toString());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
Set<TimelineMetric> metrics = JobHistoryEventUtils
.countersToTimelineMetric(getCounters(), finishTime);
return metrics;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -29,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record the successful completion of a task
@ -124,8 +127,6 @@ public class TaskFinishedEvent implements HistoryEvent {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
@ -133,5 +134,11 @@ public class TaskFinishedEvent implements HistoryEvent {
getSuccessfulTaskAttemptId().toString());
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
Set<TimelineMetric> jobMetrics = JobHistoryEventUtils
.countersToTimelineMetric(getCounters(), finishTime);
return jobMetrics;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -25,6 +27,7 @@ import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record the start of a task
@ -84,4 +87,9 @@ public class TaskStartedEvent implements HistoryEvent {
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -18,15 +18,15 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
/**
* Event to record updates to a task
@ -71,4 +71,9 @@ public class TaskUpdatedEvent implements HistoryEvent {
return tEvent;
}
@Override
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -17,10 +17,13 @@
*/
package org.apache.hadoop.mapreduce.util;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
@ -53,4 +56,20 @@ public final class JobHistoryEventUtils {
return nodes;
}
public static Set<TimelineMetric> countersToTimelineMetric(Counters counters,
long timestamp) {
Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>();
for (CounterGroup g : counters) {
String groupName = g.getName();
for (Counter c : g) {
String name = groupName + ":" + c.getName();
TimelineMetric metric = new TimelineMetric();
metric.setId(name);
metric.addValue(timestamp, c.getValue());
entityMetrics.add(metric);
}
}
return entityMetrics;
}
}

View File

@ -18,7 +18,12 @@
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
@ -38,11 +43,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
@ -200,7 +207,7 @@ public class TestMRTimelineEventHandling {
Assert.assertEquals(apps.size(), 1);
ApplicationReport appReport = apps.get(0);
firstAppId = appReport.getApplicationId();
UtilsForTests.waitForAppFinished(job, cluster);
checkNewTimelineEvent(firstAppId, appReport);
LOG.info("Run 2nd job which should be failed.");
@ -213,6 +220,7 @@ public class TestMRTimelineEventHandling {
appReport = apps.get(0).getApplicationId().equals(firstAppId) ?
apps.get(0) : apps.get(1);
checkNewTimelineEvent(firstAppId, appReport);
} finally {
@ -262,6 +270,27 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("jobEventFilePath: " + jobEventFilePath +
" does not exist.",
jobEventFile.exists());
verifyMetricsWhenEvent(jobEventFile, EventType.JOB_FINISHED.name());
// for this test, we expect MR job metrics are published in YARN_APPLICATION
String outputAppDir = basePath + "/YARN_APPLICATION/";
entityFolder = new File(outputAppDir);
Assert.assertTrue(
"Job output directory: " + outputAppDir +
" does not exist.",
entityFolder.isDirectory());
// check for job event file
String appEventFileName = appId.toString()
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String appEventFilePath = outputAppDir + appEventFileName;
File appEventFile = new File(appEventFilePath);
Assert.assertTrue(
"appEventFilePath: " + appEventFilePath +
" does not exist.",
appEventFile.exists());
verifyMetricsWhenEvent(appEventFile, null);
// check for task event file
String outputDirTask = basePath + "/MAPREDUCE_TASK/";
@ -278,6 +307,7 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("taskEventFileName: " + taskEventFilePath +
" does not exist.",
taskEventFile.exists());
verifyMetricsWhenEvent(taskEventFile, EventType.TASK_FINISHED.name());
// check for task attempt event file
String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
@ -294,6 +324,48 @@ public class TestMRTimelineEventHandling {
File taskAttemptEventFile = new File(taskAttemptEventFilePath);
Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
" does not exist.", taskAttemptEventFile.exists());
verifyMetricsWhenEvent(taskAttemptEventFile,
EventType.MAP_ATTEMPT_FINISHED.name());
}
private void verifyMetricsWhenEvent(File entityFile, String eventId)
throws IOException {
BufferedReader reader = null;
String strLine;
try {
reader = new BufferedReader(new FileReader(entityFile));
boolean jobMetricsFoundForAppEntity = false;
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
FileSystemTimelineReaderImpl.getTimelineRecordFromJSON(
strLine.trim(),
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class);
if (eventId == null) {
// Job metrics are published without any events for
// ApplicationEntity. There is also possibility that some other
// ApplicationEntity is published without events, hence loop all
if (entity.getEvents().size() == 0) {
jobMetricsFoundForAppEntity = entity.getMetrics().size() > 0;
if (jobMetricsFoundForAppEntity) {
return;
}
}
} else {
for (TimelineEvent event : entity.getEvents()) {
if (event.getId().equals(eventId)) {
assertTrue(entity.getMetrics().size() > 0);
return;
}
}
}
}
}
fail("Expected event : " + eventId + " not found in the file "
+ entityFile);
} finally {
reader.close();
}
}
@Test

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -31,8 +31,10 @@ import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -42,20 +44,24 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.commons.logging.Log;
import com.google.common.base.Supplier;
/**
* Utilities used in unit test.
@ -607,6 +613,29 @@ public class UtilsForTests {
return job;
}
public static void waitForAppFinished(RunningJob job, MiniMRYarnCluster cluster)
throws IOException {
ApplicationId appId = ApplicationId.newInstance(
Long.parseLong(job.getID().getJtIdentifier()), job.getID().getId());
ConcurrentMap<ApplicationId, RMApp> rmApps =
cluster.getResourceManager().getRMContext().getRMApps();
if (!rmApps.containsKey(appId)) {
throw new IOException("Job not found");
}
final RMApp rmApp = rmApps.get(appId);
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return RMAppImpl.isAppInFinalState(rmApp);
}
}, 1000, 1000 * 180);
} catch (TimeoutException | InterruptedException e1) {
throw new IOException("Yarn application with " + appId + " didn't finish "
+ "did not reach finale State", e1);
}
}
// Run a job that will be succeeded and wait until it completes
public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {