MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. Contributed by Junping Du.

This commit is contained in:
Zhijie Shen 2015-04-21 16:31:33 -07:00 committed by Sangjin Lee
parent 2bdefbc4a0
commit b50a6d78f5
30 changed files with 826 additions and 27 deletions

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@ -50,11 +53,13 @@ import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@ -76,6 +81,8 @@ import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
@ -127,13 +134,23 @@ public class JobHistoryEventHandler extends AbstractService
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
// For posting entities in new timeline service in a non-blocking way
// TODO YARN-3367 replace with event loop in TimelineClient.
private static ExecutorService threadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
protected TimelineClient timelineClient;
private boolean newTimelineServiceEnabled = false;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT";
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
@ -253,13 +270,22 @@ public class JobHistoryEventHandler extends AbstractService
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
// TODO replace MR specific configurations on timeline service with getting
// configuration from RM through registerApplicationMaster() in
// ApplicationMasterProtocol with return value for timeline service
// configuration status: off, on_with_v1 or on_with_v2.
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
timelineClient = TimelineClient.createTimelineClient();
timelineClient =
((MRAppMaster.RunningAppContext)context).getTimelineClient();
timelineClient.init(conf);
LOG.info("Timeline service is enabled");
newTimelineServiceEnabled = conf.getBoolean(
MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1"));
LOG.info("Emitting job history data to the timeline server is enabled");
} else {
LOG.info("Timeline service is not enabled");
@ -433,10 +459,27 @@ public class JobHistoryEventHandler extends AbstractService
if (timelineClient != null) {
timelineClient.stop();
}
shutdownAndAwaitTermination();
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
}
// TODO remove threadPool after adding non-blocking call in TimelineClient
private static void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
LOG.error("ThreadPool did not terminate");
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
FSDataOutputStream out = stagingDirFS.create(historyFilePath, true);
@ -590,9 +633,14 @@ public class JobHistoryEventHandler extends AbstractService
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
if (timelineClient != null) {
if (newTimelineServiceEnabled) {
processEventForNewTimelineService(historyEvent, event.getJobID(),
event.getTimestamp());
} else {
processEventForTimelineServer(historyEvent, event.getJobID(),
event.getTimestamp());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler "
+ event.getHistoryEvent().getEventType());
@ -832,11 +880,11 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
countersToJSON(jfe.getMapCounters()));
JobHistoryEventUtils.countersToJSON(jfe.getMapCounters()));
tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
countersToJSON(jfe.getReduceCounters()));
JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters()));
tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
countersToJSON(jfe.getTotalCounters()));
JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
@ -862,7 +910,7 @@ public class JobHistoryEventHandler extends AbstractService
tfe.getFailedAttemptID() == null ?
"" : tfe.getFailedAttemptID().toString());
tEvent.addEventInfo("COUNTERS_GROUPS",
countersToJSON(tfe.getCounters()));
JobHistoryEventUtils.countersToJSON(tfe.getCounters()));
tEntity.addEvent(tEvent);
tEntity.setEntityId(tfe.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
@ -880,7 +928,7 @@ public class JobHistoryEventHandler extends AbstractService
TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
tEvent.addEventInfo("COUNTERS_GROUPS",
countersToJSON(tfe2.getCounters()));
JobHistoryEventUtils.countersToJSON(tfe2.getCounters()));
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
@ -902,7 +950,6 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("START_TIME", tase.getStartTime());
tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
"" : tase.getContainerId().toString());
@ -935,7 +982,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
countersToJSON(tauce.getCounters()));
JobHistoryEventUtils.countersToJSON(tauce.getCounters()));
tEntity.addEvent(tEvent);
tEntity.setEntityId(tauce.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
@ -949,7 +996,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("STATE", mafe.getState());
tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
countersToJSON(mafe.getCounters()));
JobHistoryEventUtils.countersToJSON(mafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
tEvent.addEventInfo("PORT", mafe.getPort());
tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
@ -971,7 +1018,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
countersToJSON(rafe.getCounters()));
JobHistoryEventUtils.countersToJSON(rafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
tEvent.addEventInfo("PORT", rafe.getPort());
tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
@ -990,7 +1037,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
tEvent.addEventInfo("STATE", tafe.getState());
tEvent.addEventInfo("COUNTERS_GROUPS",
countersToJSON(tafe.getCounters()));
JobHistoryEventUtils.countersToJSON(tafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
tEntity.addEvent(tEvent);
tEntity.setEntityId(tafe.getTaskId().toString());
@ -1060,6 +1107,165 @@ public class JobHistoryEventHandler extends AbstractService
return nodes;
}
private void putEntityWithoutBlocking(final TimelineClient timelineClient,
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
Runnable publishWrapper = new Runnable() {
public void run() {
try {
timelineClient.putEntities(entity);
} catch (IOException|YarnException e) {
LOG.error("putEntityNonBlocking get failed: " + e);
throw new RuntimeException(e.toString());
}
}
};
threadPool.execute(publishWrapper);
}
// create JobEntity from HistoryEvent with adding other info, like:
// jobId, timestamp and entityType.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createJobEntity(HistoryEvent event, long timestamp, JobId jobId,
String entityType) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType);
entity.setId(jobId.toString());
return entity;
}
// create BaseEntity from HistoryEvent with adding other info, like:
// timestamp and entityType.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createBaseEntity(HistoryEvent event, long timestamp, String entityType) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent =
event.toTimelineEvent();
tEvent.setTimestamp(timestamp);
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.addEvent(tEvent);
entity.setType(entityType);
return entity;
}
// create TaskEntity from HistoryEvent with adding other info, like:
// taskId, jobId, timestamp, entityType and relatedJobEntity.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskEntity(HistoryEvent event, long timestamp, String taskId,
String entityType, String relatedJobEntity, JobId jobId) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType);
entity.setId(taskId);
entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
return entity;
}
// create TaskAttemptEntity from HistoryEvent with adding other info, like:
// timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskAttemptEntity(HistoryEvent event, long timestamp,
String taskAttemptId, String entityType, String relatedTaskEntity,
String taskId) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType);
entity.setId(taskAttemptId);
entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
return entity;
}
private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
long timestamp) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null;
String taskId = null;
String taskAttemptId = null;
switch (event.getEventType()) {
// Handle job events
case JOB_SUBMITTED:
case JOB_STATUS_CHANGED:
case JOB_INFO_CHANGED:
case JOB_INITED:
case JOB_PRIORITY_CHANGED:
case JOB_QUEUE_CHANGED:
case JOB_FAILED:
case JOB_KILLED:
case JOB_ERROR:
case JOB_FINISHED:
case AM_STARTED:
case NORMALIZED_RESOURCE:
break;
// Handle task events
case TASK_STARTED:
taskId = ((TaskStartedEvent)event).getTaskId().toString();
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
break;
case MAP_ATTEMPT_STARTED:
case CLEANUP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FAILED:
case CLEANUP_ATTEMPT_FAILED:
case REDUCE_ATTEMPT_FAILED:
case SETUP_ATTEMPT_FAILED:
case MAP_ATTEMPT_KILLED:
case CLEANUP_ATTEMPT_KILLED:
case REDUCE_ATTEMPT_KILLED:
case SETUP_ATTEMPT_KILLED:
taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).getAttemptId().toString();
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).getAttemptId().toString();
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptFinishedEvent)event).getAttemptId().toString();
break;
default:
LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
" and handled by timeline service.");
return;
}
if (taskId == null) {
// JobEntity
tEntity = createJobEntity(event, timestamp, jobId,
MAPREDUCE_JOB_ENTITY_TYPE);
} else {
if (taskAttemptId == null) {
// TaskEntity
tEntity = createTaskEntity(event, timestamp, taskId,
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, jobId);
} else {
// TaskAttemptEntity
tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
taskId);
}
}
putEntityWithoutBlocking(timelineClient, tEntity);
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter = allCounters

View File

@ -139,6 +139,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -1065,6 +1066,7 @@ public class MRAppMaster extends CompositeService {
private final Configuration conf;
private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
private TimelineClient timelineClient = null;
private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
@ -1074,6 +1076,23 @@ public class MRAppMaster extends CompositeService {
this.clientToAMTokenSecretManager =
new ClientToAMTokenSecretManager(appAttemptID, null);
this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
&& conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
boolean newTimelineServiceEnabled = conf.getBoolean(
MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
if (newTimelineServiceEnabled) {
// create new version TimelineClient
timelineClient = TimelineClient.createTimelineClient(
appAttemptID.getApplicationId());
} else {
timelineClient = TimelineClient.createTimelineClient();
}
}
}
@Override
@ -1164,6 +1183,10 @@ public class MRAppMaster extends CompositeService {
return taskAttemptFinishingMonitor;
}
// Get Timeline Collector's address (get sync from RM)
public TimelineClient getTimelineClient() {
return timelineClient;
}
}
@SuppressWarnings("unchecked")

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@ -821,6 +822,14 @@ public class RMContainerAllocator extends RMContainerRequestor
handleUpdatedNodes(response);
handleJobPriorityChange(response);
String collectorAddr = response.getCollectorAddr();
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
if (collectorAddr != null && !collectorAddr.isEmpty()
&& appContext.getTimelineClient() != null) {
appContext.getTimelineClient().setTimelineServiceAddress(
response.getCollectorAddr());
}
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont.getContainerId());

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.junit.Test;
public class TestEvents {
@ -405,6 +406,11 @@ public class TestEvents {
this.datum = datum;
}
@Override
public TimelineEvent toTimelineEvent() {
return null;
}
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -669,7 +670,7 @@ public class TestJobHistoryEventHandler {
group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
JsonNode jsonNode = jheh.countersToJSON(counters);
JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
+ "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
@ -692,19 +693,19 @@ public class TestJobHistoryEventHandler {
public void testCountersToJSONEmpty() throws Exception {
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
Counters counters = null;
JsonNode jsonNode = jheh.countersToJSON(counters);
JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
String expected = "[]";
Assert.assertEquals(expected, jsonStr);
counters = new Counters();
jsonNode = jheh.countersToJSON(counters);
jsonNode = JobHistoryEventUtils.countersToJSON(counters);
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
expected = "[]";
Assert.assertEquals(expected, jsonStr);
counters.addGroup("DOCTORS", "Incarnations of the Doctor");
jsonNode = jheh.countersToJSON(counters);
jsonNode = JobHistoryEventUtils.countersToJSON(counters);
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
+ "Doctor\",\"COUNTERS\":[]}]";

View File

@ -475,6 +475,11 @@ public interface MRJobConfig {
public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
false;
public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
"mapreduce.job.new-timeline-service.enabled";
public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
false;
public static final String MR_PREFIX = "yarn.app.mapreduce.";
public static final String MR_AM_PREFIX = MR_PREFIX + "am.";

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.avro.util.Utf8;
@ -166,4 +168,20 @@ public class AMStartedEvent implements HistoryEvent {
public EventType getEventType() {
return EventType.AM_STARTED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("APPLICATION_ATTEMPT_ID",
getAppAttemptId() == null ? "" : getAppAttemptId().toString());
tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
"" : getContainerId().toString());
tEvent.addInfo("NODE_MANAGER_HOST", getNodeManagerHost());
tEvent.addInfo("NODE_MANAGER_PORT", getNodeManagerPort());
tEvent.addInfo("NODE_MANAGER_HTTP_PORT", getNodeManagerHttpPort());
tEvent.addInfo("START_TIME", getStartTime());
return tEvent;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Interface for event wrapper classes. Implementations each wrap an
@ -37,4 +38,7 @@ public interface HistoryEvent {
/** Set the Avro datum wrapped by this. */
void setDatum(Object datum);
/** Map HistoryEvent to TimelineEvent */
TimelineEvent toTimelineEvent();
}

View File

@ -23,6 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record successful completion of job
@ -133,4 +136,26 @@ public class JobFinishedEvent implements HistoryEvent {
public Counters getReduceCounters() {
return reduceCounters;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("NUM_MAPS", getFinishedMaps());
tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
tEvent.addInfo("FAILED_MAPS", getFailedMaps());
tEvent.addInfo("FAILED_REDUCES", getFailedReduces());
tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
tEvent.addInfo("MAP_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getMapCounters()));
tEvent.addInfo("REDUCE_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getReduceCounters()));
tEvent.addInfo("TOTAL_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getTotalCounters()));
// TODO replace SUCCEEDED with JobState.SUCCEEDED.toString()
tEvent.addInfo("JOB_STATUS", "SUCCEEDED");
return tEvent;
}
}

View File

@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@ -65,4 +67,13 @@ public class JobInfoChangeEvent implements HistoryEvent {
return EventType.JOB_INFO_CHANGED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("SUBMIT_TIME", getSubmitTime());
tEvent.addInfo("LAUNCH_TIME", getLaunchTime());
return tEvent;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@ -73,4 +75,16 @@ public class JobInitedEvent implements HistoryEvent {
}
/** Get whether the job's map and reduce stages were combined */
public boolean getUberized() { return datum.getUberized(); }
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("START_TIME", getLaunchTime());
tEvent.addInfo("STATUS", getStatus());
tEvent.addInfo("TOTAL_MAPS", getTotalMaps());
tEvent.addInfo("TOTAL_REDUCES", getTotalReduces());
tEvent.addInfo("UBERIZED", getUberized());
return tEvent;
}
}

View File

@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@ -65,4 +67,12 @@ public class JobPriorityChangeEvent implements HistoryEvent {
return EventType.JOB_PRIORITY_CHANGED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("PRIORITY", getPriority().toString());
return tEvent;
}
}

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@SuppressWarnings("deprecation")
public class JobQueueChangeEvent implements HistoryEvent {
@ -60,4 +62,12 @@ public class JobQueueChangeEvent implements HistoryEvent {
return null;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("QUEUE_NAMES", getJobQueueName());
return tEvent;
}
}

View File

@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@ -61,4 +63,12 @@ public class JobStatusChangedEvent implements HistoryEvent {
return EventType.JOB_STATUS_CHANGED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("STATUS", getStatus());
return tEvent;
}
}

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@ -206,4 +208,25 @@ public class JobSubmittedEvent implements HistoryEvent {
/** Get the event type */
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("SUBMIT_TIME", getSubmitTime());
tEvent.addInfo("QUEUE_NAME", getJobQueueName());
tEvent.addInfo("JOB_NAME", getJobName());
tEvent.addInfo("USER_NAME", getUserName());
tEvent.addInfo("JOB_CONF_PATH", getJobConfPath());
tEvent.addInfo("ACLS", getJobAcls());
tEvent.addInfo("JOB_QUEUE_NAME", getJobQueueName());
tEvent.addInfo("WORKLFOW_ID", getWorkflowId());
tEvent.addInfo("WORKFLOW_NAME", getWorkflowName());
tEvent.addInfo("WORKFLOW_NODE_NAME", getWorkflowNodeName());
tEvent.addInfo("WORKFLOW_ADJACENCIES",
getWorkflowAdjacencies());
tEvent.addInfo("WORKFLOW_TAGS", getWorkflowTags());
return tEvent;
}
}

View File

@ -24,6 +24,8 @@ import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import java.util.Collections;
@ -119,4 +121,18 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
final CharSequence diagnostics = datum.getDiagnostics();
return diagnostics == null ? NODIAGS : diagnostics.toString();
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("NUM_MAPS", getFinishedMaps());
tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
tEvent.addInfo("JOB_STATUS", getStatus());
tEvent.addInfo("DIAGNOSTICS", getDiagnostics());
tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
return tEvent;
}
}

View File

@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record successful completion of a map attempt
@ -218,4 +221,23 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
return physMemKbytes;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("STATUS", getTaskStatus());
tEvent.addInfo("STATE", getState());
tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
tEvent.addInfo("HOSTNAME", getHostname());
tEvent.addInfo("PORT", getPort());
tEvent.addInfo("RACK_NAME", getRackName());
tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
"" : getAttemptId().toString());
return tEvent;
}
}

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record the normalized map/reduce requirements.
@ -71,4 +73,13 @@ public class NormalizedResourceEvent implements HistoryEvent {
public void setDatum(Object datum) {
throw new UnsupportedOperationException("Not a seriable object");
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("MEMORY", "" + getMemory());
tEvent.addInfo("TASK_TYPE", getTaskType());
return tEvent;
}
}

View File

@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record successful completion of a reduce attempt
@ -223,4 +226,24 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
return physMemKbytes;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
"" : getAttemptId().toString());
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("STATUS", getTaskStatus());
tEvent.addInfo("STATE", getState());
tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime());
tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
tEvent.addInfo("HOSTNAME", getHostname());
tEvent.addInfo("PORT", getPort());
tEvent.addInfo("RACK_NAME", getRackName());
return tEvent;
}
}

View File

@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record successful task completion
@ -136,4 +139,20 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
: EventType.REDUCE_ATTEMPT_FINISHED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
"" : getAttemptId().toString());
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("STATUS", getTaskStatus());
tEvent.addInfo("STATE", getState());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
tEvent.addInfo("HOSTNAME", getHostname());
return tEvent;
}
}

View File

@ -23,8 +23,10 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@ -133,4 +135,20 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
return null;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("TASK_ATTEMPT_ID",
getTaskAttemptId().toString());
tEvent.addInfo("START_TIME", getStartTime());
tEvent.addInfo("HTTP_PORT", getHttpPort());
tEvent.addInfo("TRACKER_NAME", getTrackerName());
tEvent.addInfo("SHUFFLE_PORT", getShufflePort());
tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
"" : getContainerId().toString());
return tEvent;
}
}

View File

@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
@ -248,4 +251,25 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
return physMemKbytes;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("TASK_ATTEMPT_ID", getTaskAttemptId() == null ?
"" : getTaskAttemptId().toString());
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("ERROR", getError());
tEvent.addInfo("STATUS", getTaskStatus());
tEvent.addInfo("HOSTNAME", getHostname());
tEvent.addInfo("PORT", getPort());
tEvent.addInfo("RACK_NAME", getRackName());
tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime());
tEvent.addInfo("SORT_FINISH_TIME", getFinishTime());
tEvent.addInfo("MAP_FINISH_TIME", getFinishTime());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
return tEvent;
}
}

View File

@ -20,10 +20,14 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@ -137,4 +141,19 @@ public class TaskFailedEvent implements HistoryEvent {
return EventType.TASK_FAILED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("STATUS", TaskStatus.State.FAILED.toString());
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("ERROR", getError());
tEvent.addInfo("FAILED_ATTEMPT_ID",
getFailedAttemptID() == null ? "" : getFailedAttemptID().toString());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
return tEvent;
}
}

View File

@ -21,10 +21,14 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record the successful completion of a task
@ -115,5 +119,19 @@ public class TaskFinishedEvent implements HistoryEvent {
return EventType.TASK_FINISHED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(getCounters()));
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
getSuccessfulTaskAttemptId() == null ? "" :
getSuccessfulTaskAttemptId().toString());
return tEvent;
}
}

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record the start of a task
@ -72,4 +74,14 @@ public class TaskStartedEvent implements HistoryEvent {
return EventType.TASK_STARTED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("TASK_TYPE", getTaskType().toString());
tEvent.addInfo("START_TIME", getStartTime());
tEvent.addInfo("SPLIT_LOCATIONS", getSplitLocations());
return tEvent;
}
}

View File

@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@ -61,4 +63,12 @@ public class TaskUpdatedEvent implements HistoryEvent {
return EventType.TASK_UPDATED;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("FINISH_TIME", getFinishTime());
return tEvent;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode;
public class JobHistoryEventUtils {
public static JsonNode countersToJSON(Counters counters) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode nodes = mapper.createArrayNode();
if (counters != null) {
for (CounterGroup counterGroup : counters) {
ObjectNode groupNode = nodes.addObject();
groupNode.put("NAME", counterGroup.getName());
groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
ArrayNode countersNode = groupNode.putArray("COUNTERS");
for (Counter counter : counterGroup) {
ObjectNode counterNode = countersNode.addObject();
counterNode.put("NAME", counter.getName());
counterNode.put("DISPLAY_NAME", counter.getDisplayName());
counterNode.put("VALUE", counter.getValue());
}
}
}
return nodes;
}
}

View File

@ -637,6 +637,13 @@
</description>
</property>
<property>
<name>mapreduce.job.new-timeline-service.enabled</name>
<value>false</value>
<description>Specifies if posting job and task events to new timeline service.
</description>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>0</value>

View File

@ -18,23 +18,46 @@
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
import org.junit.Test;
public class TestMRTimelineEventHandling {
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
private static final Log LOG =
LogFactory.getLog(TestMRTimelineEventHandling.class);
@Test
public void testTimelineServiceStartInMiniCluster() throws Exception {
Configuration conf = new YarnConfiguration();
@ -48,7 +71,7 @@ public class TestMRTimelineEventHandling {
MiniMRYarnCluster cluster = null;
try {
cluster = new MiniMRYarnCluster(
TestJobHistoryEventHandler.class.getSimpleName(), 1);
TestMRTimelineEventHandling.class.getSimpleName(), 1);
cluster.init(conf);
cluster.start();
@ -89,7 +112,7 @@ public class TestMRTimelineEventHandling {
MiniMRYarnCluster cluster = null;
try {
cluster = new MiniMRYarnCluster(
TestJobHistoryEventHandler.class.getSimpleName(), 1);
TestMRTimelineEventHandling.class.getSimpleName(), 1);
cluster.init(conf);
cluster.start();
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
@ -138,6 +161,140 @@ public class TestMRTimelineEventHandling {
}
}
@Test
public void testMRNewTimelineServiceEventHandling() throws Exception {
LOG.info("testMRNewTimelineServiceEventHandling start.");
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
// enable new timeline serivce in MR side
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true);
// enable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
+ ".class", PerNodeTimelineCollectorsAuxService.class.getName());
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
MiniMRYarnCluster cluster = null;
try {
cluster = new MiniMRYarnCluster(
TestMRTimelineEventHandling.class.getSimpleName(), 1, true);
cluster.init(conf);
cluster.start();
LOG.info("A MiniMRYarnCluster get start.");
Path inDir = new Path("input");
Path outDir = new Path("output");
LOG.info("Run 1st job which should be successful.");
RunningJob job =
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(cluster.getConfig()));
yarnClient.start();
EnumSet<YarnApplicationState> appStates =
EnumSet.allOf(YarnApplicationState.class);
ApplicationId firstAppId = null;
List<ApplicationReport> apps = yarnClient.getApplications(appStates);
Assert.assertEquals(apps.size(), 1);
ApplicationReport appReport = apps.get(0);
firstAppId = appReport.getApplicationId();
checkNewTimelineEvent(firstAppId);
LOG.info("Run 2nd job which should be failed.");
job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
Assert.assertEquals(JobStatus.FAILED,
job.getJobStatus().getState().getValue());
apps = yarnClient.getApplications(appStates);
Assert.assertEquals(apps.size(), 2);
ApplicationId secAppId = null;
secAppId = apps.get(0).getApplicationId() == firstAppId ?
apps.get(1).getApplicationId() : apps.get(0).getApplicationId();
checkNewTimelineEvent(firstAppId);
} finally {
if (cluster != null) {
cluster.stop();
}
// Cleanup test file
String testRoot =
FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
File testRootFolder = new File(testRoot);
if(testRootFolder.isDirectory()) {
FileUtils.deleteDirectory(testRootFolder);
}
}
}
private void checkNewTimelineEvent(ApplicationId appId) throws IOException {
String tmpRoot =
FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ "/entities/";
File tmpRootFolder = new File(tmpRoot);
Assert.assertTrue(tmpRootFolder.isDirectory());
String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
UserGroupInformation.getCurrentUser().getShortUserName() +
"/" + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
"/1/1/" + appId.toString();
// for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
String outputDirJob = basePath + "/MAPREDUCE_JOB/";
File entityFolder = new File(outputDirJob);
Assert.assertTrue("Job output directory: " + outputDirJob + " is not exist.",
entityFolder.isDirectory());
// check for job event file
String jobEventFileName = appId.toString().replaceAll("application", "job")
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String jobEventFilePath = outputDirJob + jobEventFileName;
File jobEventFile = new File(jobEventFilePath);
Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " is not exist.",
jobEventFile.exists());
// check for task event file
String outputDirTask = basePath + "/MAPREDUCE_TASK/";
File taskFolder = new File(outputDirTask);
Assert.assertTrue("Task output directory: " + outputDirTask + " is not exist.",
taskFolder.isDirectory());
String taskEventFileName = appId.toString().replaceAll("application", "task")
+ "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String taskEventFilePath = outputDirTask + taskEventFileName;
File taskEventFile = new File(taskEventFilePath);
Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " is not exist.",
taskEventFile.exists());
// check for task attempt event file
String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
File taskAttemptFolder = new File(outputDirTaskAttempt);
Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt +
" is not exist.", taskAttemptFolder.isDirectory());
String taskAttemptEventFileName = appId.toString().replaceAll(
"application", "attempt") + "_m_000000_0" +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String taskAttemptEventFilePath = outputDirTaskAttempt +
taskAttemptEventFileName;
File taskAttemptEventFile = new File(taskAttemptEventFilePath);
Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
" is not exist.", taskAttemptEventFile.exists());
}
@Test
public void testMapreduceJobTimelineServiceEnabled()
throws Exception {
@ -147,7 +304,7 @@ public class TestMRTimelineEventHandling {
MiniMRYarnCluster cluster = null;
try {
cluster = new MiniMRYarnCluster(
TestJobHistoryEventHandler.class.getSimpleName(), 1);
TestMRTimelineEventHandling.class.getSimpleName(), 1);
cluster.init(conf);
cluster.start();
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,

View File

@ -66,6 +66,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class);
private JobHistoryServer historyServer;
private JobHistoryServerWrapper historyServerWrapper;
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
public MiniMRYarnCluster(String testName) {
this(testName, 1);
@ -167,8 +168,24 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM
String[] nmAuxServices = conf.getStrings(YarnConfiguration.NM_AUX_SERVICES);
// if need to enable TIMELINE_AUX_SERVICE_NAME
boolean enableTimelineAuxService = false;
if (nmAuxServices != null) {
for (String nmAuxService: nmAuxServices) {
if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) {
enableTimelineAuxService = true;
break;
}
}
}
if (enableTimelineAuxService) {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, TIMELINE_AUX_SERVICE_NAME });
} else {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
}
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);