From b50a6d78f5e12cfb9e0f52e0af6efbab3618e2e5 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Tue, 21 Apr 2015 16:31:33 -0700 Subject: [PATCH] MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. Contributed by Junping Du. --- .../jobhistory/JobHistoryEventHandler.java | 236 ++++++++++++++++-- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 23 ++ .../v2/app/rm/RMContainerAllocator.java | 9 + .../mapreduce/jobhistory/TestEvents.java | 8 +- .../TestJobHistoryEventHandler.java | 9 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 5 + .../mapreduce/jobhistory/AMStartedEvent.java | 18 ++ .../mapreduce/jobhistory/HistoryEvent.java | 4 + .../jobhistory/JobFinishedEvent.java | 25 ++ .../jobhistory/JobInfoChangeEvent.java | 11 + .../mapreduce/jobhistory/JobInitedEvent.java | 14 ++ .../jobhistory/JobPriorityChangeEvent.java | 10 + .../jobhistory/JobQueueChangeEvent.java | 10 + .../jobhistory/JobStatusChangedEvent.java | 10 + .../jobhistory/JobSubmittedEvent.java | 23 ++ .../JobUnsuccessfulCompletionEvent.java | 16 ++ .../jobhistory/MapAttemptFinishedEvent.java | 24 +- .../jobhistory/NormalizedResourceEvent.java | 11 + .../ReduceAttemptFinishedEvent.java | 25 +- .../jobhistory/TaskAttemptFinishedEvent.java | 19 ++ .../jobhistory/TaskAttemptStartedEvent.java | 18 ++ ...askAttemptUnsuccessfulCompletionEvent.java | 24 ++ .../mapreduce/jobhistory/TaskFailedEvent.java | 19 ++ .../jobhistory/TaskFinishedEvent.java | 18 ++ .../jobhistory/TaskStartedEvent.java | 12 + .../jobhistory/TaskUpdatedEvent.java | 10 + .../mapreduce/util/JobHistoryEventUtils.java | 51 ++++ .../src/main/resources/mapred-default.xml | 7 + .../mapred/TestMRTimelineEventHandling.java | 163 +++++++++++- .../mapreduce/v2/MiniMRYarnCluster.java | 21 +- 30 files changed, 826 insertions(+), 27 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index b6c030021ee..e2153ce152f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -19,6 +19,9 @@ package org.apache.hadoop.mapreduce.jobhistory; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -50,11 +53,13 @@ import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; @@ -76,6 +81,8 @@ import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.JsonNodeFactory; import org.codehaus.jackson.node.ObjectNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; @@ -126,14 +133,24 @@ public class JobHistoryEventHandler extends AbstractService protected static final Map fileMap = Collections.synchronizedMap(new HashMap()); + + // For posting entities in new timeline service in a non-blocking way + // TODO YARN-3367 replace with event loop in TimelineClient. + private static ExecutorService threadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); // should job completion be force when the AM shuts down? protected volatile boolean forceJobCompletion = false; protected TimelineClient timelineClient; + + private boolean newTimelineServiceEnabled = false; private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; + private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT"; public JobHistoryEventHandler(AppContext context, int startCount) { super("JobHistoryEventHandler"); @@ -253,13 +270,22 @@ public class JobHistoryEventHandler extends AbstractService MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD); + // TODO replace MR specific configurations on timeline service with getting + // configuration from RM through registerApplicationMaster() in + // ApplicationMasterProtocol with return value for timeline service + // configuration status: off, on_with_v1 or on_with_v2. if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - timelineClient = TimelineClient.createTimelineClient(); + + timelineClient = + ((MRAppMaster.RunningAppContext)context).getTimelineClient(); timelineClient.init(conf); - LOG.info("Timeline service is enabled"); + newTimelineServiceEnabled = conf.getBoolean( + MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, + MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); + LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1")); LOG.info("Emitting job history data to the timeline server is enabled"); } else { LOG.info("Timeline service is not enabled"); @@ -433,9 +459,26 @@ public class JobHistoryEventHandler extends AbstractService if (timelineClient != null) { timelineClient.stop(); } + shutdownAndAwaitTermination(); LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); } + + // TODO remove threadPool after adding non-blocking call in TimelineClient + private static void shutdownAndAwaitTermination() { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) + LOG.error("ThreadPool did not terminate"); + } + } catch (InterruptedException ie) { + threadPool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } protected EventWriter createEventWriter(Path historyFilePath) throws IOException { @@ -590,8 +633,13 @@ public class JobHistoryEventHandler extends AbstractService processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); if (timelineClient != null) { - processEventForTimelineServer(historyEvent, event.getJobID(), - event.getTimestamp()); + if (newTimelineServiceEnabled) { + processEventForNewTimelineService(historyEvent, event.getJobID(), + event.getTimestamp()); + } else { + processEventForTimelineServer(historyEvent, event.getJobID(), + event.getTimestamp()); + } } if (LOG.isDebugEnabled()) { LOG.debug("In HistoryEventHandler " @@ -832,11 +880,11 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps()); tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces()); tEvent.addEventInfo("MAP_COUNTERS_GROUPS", - countersToJSON(jfe.getMapCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getMapCounters())); tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS", - countersToJSON(jfe.getReduceCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters())); tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS", - countersToJSON(jfe.getTotalCounters())); + JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters())); tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString()); tEntity.addEvent(tEvent); tEntity.setEntityId(jobId.toString()); @@ -862,7 +910,7 @@ public class JobHistoryEventHandler extends AbstractService tfe.getFailedAttemptID() == null ? "" : tfe.getFailedAttemptID().toString()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tfe.getCounters())); + JobHistoryEventUtils.countersToJSON(tfe.getCounters())); tEntity.addEvent(tEvent); tEntity.setEntityId(tfe.getTaskId().toString()); tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); @@ -880,7 +928,7 @@ public class JobHistoryEventHandler extends AbstractService TaskFinishedEvent tfe2 = (TaskFinishedEvent) event; tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tfe2.getCounters())); + JobHistoryEventUtils.countersToJSON(tfe2.getCounters())); tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime()); tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID", @@ -902,7 +950,6 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("START_TIME", tase.getStartTime()); tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort()); tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName()); - tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString()); tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort()); tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ? "" : tase.getContainerId().toString()); @@ -935,7 +982,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime()); tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tauce.getCounters())); + JobHistoryEventUtils.countersToJSON(tauce.getCounters())); tEntity.addEvent(tEvent); tEntity.setEntityId(tauce.getTaskId().toString()); tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); @@ -949,7 +996,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("STATE", mafe.getState()); tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(mafe.getCounters())); + JobHistoryEventUtils.countersToJSON(mafe.getCounters())); tEvent.addEventInfo("HOSTNAME", mafe.getHostname()); tEvent.addEventInfo("PORT", mafe.getPort()); tEvent.addEventInfo("RACK_NAME", mafe.getRackName()); @@ -971,7 +1018,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime()); tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(rafe.getCounters())); + JobHistoryEventUtils.countersToJSON(rafe.getCounters())); tEvent.addEventInfo("HOSTNAME", rafe.getHostname()); tEvent.addEventInfo("PORT", rafe.getPort()); tEvent.addEventInfo("RACK_NAME", rafe.getRackName()); @@ -990,7 +1037,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("STATUS", tafe.getTaskStatus()); tEvent.addEventInfo("STATE", tafe.getState()); tEvent.addEventInfo("COUNTERS_GROUPS", - countersToJSON(tafe.getCounters())); + JobHistoryEventUtils.countersToJSON(tafe.getCounters())); tEvent.addEventInfo("HOSTNAME", tafe.getHostname()); tEntity.addEvent(tEvent); tEntity.setEntityId(tafe.getTaskId().toString()); @@ -1017,7 +1064,7 @@ public class JobHistoryEventHandler extends AbstractService default: break; } - + try { TimelinePutResponse response = timelineClient.putEntities(tEntity); List errors = response.getErrors(); @@ -1060,6 +1107,165 @@ public class JobHistoryEventHandler extends AbstractService return nodes; } + private void putEntityWithoutBlocking(final TimelineClient timelineClient, + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) { + Runnable publishWrapper = new Runnable() { + public void run() { + try { + timelineClient.putEntities(entity); + } catch (IOException|YarnException e) { + LOG.error("putEntityNonBlocking get failed: " + e); + throw new RuntimeException(e.toString()); + } + } + }; + threadPool.execute(publishWrapper); + } + + // create JobEntity from HistoryEvent with adding other info, like: + // jobId, timestamp and entityType. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createJobEntity(HistoryEvent event, long timestamp, JobId jobId, + String entityType) { + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType); + entity.setId(jobId.toString()); + return entity; + } + + // create BaseEntity from HistoryEvent with adding other info, like: + // timestamp and entityType. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createBaseEntity(HistoryEvent event, long timestamp, String entityType) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent = + event.toTimelineEvent(); + tEvent.setTimestamp(timestamp); + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); + entity.addEvent(tEvent); + entity.setType(entityType); + return entity; + } + + // create TaskEntity from HistoryEvent with adding other info, like: + // taskId, jobId, timestamp, entityType and relatedJobEntity. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createTaskEntity(HistoryEvent event, long timestamp, String taskId, + String entityType, String relatedJobEntity, JobId jobId) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType); + entity.setId(taskId); + entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString()); + return entity; + } + + // create TaskAttemptEntity from HistoryEvent with adding other info, like: + // timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId. + private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + createTaskAttemptEntity(HistoryEvent event, long timestamp, + String taskAttemptId, String entityType, String relatedTaskEntity, + String taskId) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = + createBaseEntity(event, timestamp, entityType); + entity.setId(taskAttemptId); + entity.addIsRelatedToEntity(relatedTaskEntity, taskId); + return entity; + } + + private void processEventForNewTimelineService(HistoryEvent event, JobId jobId, + long timestamp) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null; + String taskId = null; + String taskAttemptId = null; + + switch (event.getEventType()) { + // Handle job events + case JOB_SUBMITTED: + case JOB_STATUS_CHANGED: + case JOB_INFO_CHANGED: + case JOB_INITED: + case JOB_PRIORITY_CHANGED: + case JOB_QUEUE_CHANGED: + case JOB_FAILED: + case JOB_KILLED: + case JOB_ERROR: + case JOB_FINISHED: + case AM_STARTED: + case NORMALIZED_RESOURCE: + break; + // Handle task events + case TASK_STARTED: + taskId = ((TaskStartedEvent)event).getTaskId().toString(); + break; + case TASK_FAILED: + taskId = ((TaskFailedEvent)event).getTaskId().toString(); + break; + case TASK_UPDATED: + taskId = ((TaskUpdatedEvent)event).getTaskId().toString(); + break; + case TASK_FINISHED: + taskId = ((TaskFinishedEvent)event).getTaskId().toString(); + break; + case MAP_ATTEMPT_STARTED: + case CLEANUP_ATTEMPT_STARTED: + case REDUCE_ATTEMPT_STARTED: + case SETUP_ATTEMPT_STARTED: + taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptStartedEvent)event). + getTaskAttemptId().toString(); + break; + case MAP_ATTEMPT_FAILED: + case CLEANUP_ATTEMPT_FAILED: + case REDUCE_ATTEMPT_FAILED: + case SETUP_ATTEMPT_FAILED: + case MAP_ATTEMPT_KILLED: + case CLEANUP_ATTEMPT_KILLED: + case REDUCE_ATTEMPT_KILLED: + case SETUP_ATTEMPT_KILLED: + taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event). + getTaskAttemptId().toString(); + break; + case MAP_ATTEMPT_FINISHED: + taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((MapAttemptFinishedEvent)event).getAttemptId().toString(); + break; + case REDUCE_ATTEMPT_FINISHED: + taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((ReduceAttemptFinishedEvent)event).getAttemptId().toString(); + break; + case SETUP_ATTEMPT_FINISHED: + case CLEANUP_ATTEMPT_FINISHED: + taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString(); + taskAttemptId = ((TaskAttemptFinishedEvent)event).getAttemptId().toString(); + break; + default: + LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" + + " and handled by timeline service."); + return; + } + if (taskId == null) { + // JobEntity + tEntity = createJobEntity(event, timestamp, jobId, + MAPREDUCE_JOB_ENTITY_TYPE); + } else { + if (taskAttemptId == null) { + // TaskEntity + tEntity = createTaskEntity(event, timestamp, taskId, + MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, jobId); + } else { + // TaskAttemptEntity + tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId, + MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE, + taskId); + } + } + + putEntityWithoutBlocking(timelineClient, tEntity); + } + private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { Counter slotMillisMapCounter = allCounters diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index c5070f3a924..a5f391387a2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -139,6 +139,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -1065,6 +1066,7 @@ public class MRAppMaster extends CompositeService { private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; + private TimelineClient timelineClient = null; private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; @@ -1074,6 +1076,23 @@ public class MRAppMaster extends CompositeService { this.clientToAMTokenSecretManager = new ClientToAMTokenSecretManager(appAttemptID, null); this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor; + if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, + MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA) + && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + + boolean newTimelineServiceEnabled = conf.getBoolean( + MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, + MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED); + + if (newTimelineServiceEnabled) { + // create new version TimelineClient + timelineClient = TimelineClient.createTimelineClient( + appAttemptID.getApplicationId()); + } else { + timelineClient = TimelineClient.createTimelineClient(); + } + } } @Override @@ -1164,6 +1183,10 @@ public class MRAppMaster extends CompositeService { return taskAttemptFinishingMonitor; } + // Get Timeline Collector's address (get sync from RM) + public TimelineClient getTimelineClient() { + return timelineClient; + } } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index eb847d6978a..e1fc84fd205 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; @@ -821,6 +822,14 @@ public class RMContainerAllocator extends RMContainerRequestor handleUpdatedNodes(response); handleJobPriorityChange(response); + String collectorAddr = response.getCollectorAddr(); + MRAppMaster.RunningAppContext appContext = + (MRAppMaster.RunningAppContext)this.getContext(); + if (collectorAddr != null && !collectorAddr.isEmpty() + && appContext.getTimelineClient() != null) { + appContext.getTimelineClient().setTimelineServiceAddress( + response.getCollectorAddr()); + } for (ContainerStatus cont : finishedContainers) { LOG.info("Received completed container " + cont.getContainerId()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java index 7612ceb0718..e7d50067fce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.junit.Test; public class TestEvents { @@ -404,7 +405,12 @@ public class TestEvents { public void setDatum(Object datum) { this.datum = datum; } - + + @Override + public TimelineEvent toTimelineEvent() { + return null; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 8ca386ed703..c173461f476 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -669,7 +670,7 @@ public class TestJobHistoryEventHandler { group2.addCounter("MARTHA_JONES", "Martha Jones", 3); group2.addCounter("DONNA_NOBLE", "Donna Noble", 2); group2.addCounter("ROSE_TYLER", "Rose Tyler", 1); - JsonNode jsonNode = jheh.countersToJSON(counters); + JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters); String jsonStr = new ObjectMapper().writeValueAsString(jsonNode); String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions " + "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\"" @@ -692,19 +693,19 @@ public class TestJobHistoryEventHandler { public void testCountersToJSONEmpty() throws Exception { JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0); Counters counters = null; - JsonNode jsonNode = jheh.countersToJSON(counters); + JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters); String jsonStr = new ObjectMapper().writeValueAsString(jsonNode); String expected = "[]"; Assert.assertEquals(expected, jsonStr); counters = new Counters(); - jsonNode = jheh.countersToJSON(counters); + jsonNode = JobHistoryEventUtils.countersToJSON(counters); jsonStr = new ObjectMapper().writeValueAsString(jsonNode); expected = "[]"; Assert.assertEquals(expected, jsonStr); counters.addGroup("DOCTORS", "Incarnations of the Doctor"); - jsonNode = jheh.countersToJSON(counters); + jsonNode = JobHistoryEventUtils.countersToJSON(counters); jsonStr = new ObjectMapper().writeValueAsString(jsonNode); expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the " + "Doctor\",\"COUNTERS\":[]}]"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index be47bfcda8a..230c4aa2b01 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -474,6 +474,11 @@ public interface MRJobConfig { "mapreduce.job.emit-timeline-data"; public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA = false; + + public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED = + "mapreduce.job.new-timeline-service.enabled"; + public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED = + false; public static final String MR_PREFIX = "yarn.app.mapreduce."; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java index 266aa94f0cb..f98dee5f4d0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java @@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.avro.util.Utf8; @@ -166,4 +168,20 @@ public class AMStartedEvent implements HistoryEvent { public EventType getEventType() { return EventType.AM_STARTED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("APPLICATION_ATTEMPT_ID", + getAppAttemptId() == null ? "" : getAppAttemptId().toString()); + tEvent.addInfo("CONTAINER_ID", getContainerId() == null ? + "" : getContainerId().toString()); + tEvent.addInfo("NODE_MANAGER_HOST", getNodeManagerHost()); + tEvent.addInfo("NODE_MANAGER_PORT", getNodeManagerPort()); + tEvent.addInfo("NODE_MANAGER_HTTP_PORT", getNodeManagerHttpPort()); + tEvent.addInfo("START_TIME", getStartTime()); + return tEvent; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java index a30748cd651..61ce2170343 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Interface for event wrapper classes. Implementations each wrap an @@ -37,4 +38,7 @@ public interface HistoryEvent { /** Set the Avro datum wrapped by this. */ void setDatum(Object datum); + + /** Map HistoryEvent to TimelineEvent */ + TimelineEvent toTimelineEvent(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java index 0fa5868bcfb..80d3ee64ffe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java @@ -23,6 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record successful completion of job @@ -133,4 +136,26 @@ public class JobFinishedEvent implements HistoryEvent { public Counters getReduceCounters() { return reduceCounters; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("NUM_MAPS", getFinishedMaps()); + tEvent.addInfo("NUM_REDUCES", getFinishedReduces()); + tEvent.addInfo("FAILED_MAPS", getFailedMaps()); + tEvent.addInfo("FAILED_REDUCES", getFailedReduces()); + tEvent.addInfo("FINISHED_MAPS", getFinishedMaps()); + tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces()); + tEvent.addInfo("MAP_COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getMapCounters())); + tEvent.addInfo("REDUCE_COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getReduceCounters())); + tEvent.addInfo("TOTAL_COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getTotalCounters())); + // TODO replace SUCCEEDED with JobState.SUCCEEDED.toString() + tEvent.addInfo("JOB_STATUS", "SUCCEEDED"); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java index b45f373cd0f..ad8244370ba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -64,5 +66,14 @@ public class JobInfoChangeEvent implements HistoryEvent { public EventType getEventType() { return EventType.JOB_INFO_CHANGED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("SUBMIT_TIME", getSubmitTime()); + tEvent.addInfo("LAUNCH_TIME", getLaunchTime()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java index 5abb40e6bb1..3e0f2f7eb83 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java @@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -73,4 +75,16 @@ public class JobInitedEvent implements HistoryEvent { } /** Get whether the job's map and reduce stages were combined */ public boolean getUberized() { return datum.getUberized(); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("START_TIME", getLaunchTime()); + tEvent.addInfo("STATUS", getStatus()); + tEvent.addInfo("TOTAL_MAPS", getTotalMaps()); + tEvent.addInfo("TOTAL_REDUCES", getTotalReduces()); + tEvent.addInfo("UBERIZED", getUberized()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java index 6c324628c13..5deea0aff36 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -64,5 +66,13 @@ public class JobPriorityChangeEvent implements HistoryEvent { public EventType getEventType() { return EventType.JOB_PRIORITY_CHANGED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("PRIORITY", getPriority().toString()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java index 86078e6b12c..b9dd3592242 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java @@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.avro.util.Utf8; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; @SuppressWarnings("deprecation") public class JobQueueChangeEvent implements HistoryEvent { @@ -59,5 +61,13 @@ public class JobQueueChangeEvent implements HistoryEvent { } return null; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("QUEUE_NAMES", getJobQueueName()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java index 9e11a55256c..a4f2da2226c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -60,5 +62,13 @@ public class JobStatusChangedEvent implements HistoryEvent { public EventType getEventType() { return EventType.JOB_STATUS_CHANGED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("STATUS", getStatus()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java index 845f6f7c253..47b284006da 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -205,5 +207,26 @@ public class JobSubmittedEvent implements HistoryEvent { } /** Get the event type */ public EventType getEventType() { return EventType.JOB_SUBMITTED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("SUBMIT_TIME", getSubmitTime()); + tEvent.addInfo("QUEUE_NAME", getJobQueueName()); + tEvent.addInfo("JOB_NAME", getJobName()); + tEvent.addInfo("USER_NAME", getUserName()); + tEvent.addInfo("JOB_CONF_PATH", getJobConfPath()); + tEvent.addInfo("ACLS", getJobAcls()); + tEvent.addInfo("JOB_QUEUE_NAME", getJobQueueName()); + tEvent.addInfo("WORKLFOW_ID", getWorkflowId()); + tEvent.addInfo("WORKFLOW_NAME", getWorkflowName()); + tEvent.addInfo("WORKFLOW_NODE_NAME", getWorkflowNodeName()); + tEvent.addInfo("WORKFLOW_ADJACENCIES", + getWorkflowAdjacencies()); + tEvent.addInfo("WORKFLOW_TAGS", getWorkflowTags()); + + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java index 1b7773d6ba0..ea9798c0334 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java @@ -24,6 +24,8 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import java.util.Collections; @@ -119,4 +121,18 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent { final CharSequence diagnostics = datum.getDiagnostics(); return diagnostics == null ? NODIAGS : diagnostics.toString(); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("NUM_MAPS", getFinishedMaps()); + tEvent.addInfo("NUM_REDUCES", getFinishedReduces()); + tEvent.addInfo("JOB_STATUS", getStatus()); + tEvent.addInfo("DIAGNOSTICS", getDiagnostics()); + tEvent.addInfo("FINISHED_MAPS", getFinishedMaps()); + tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java index bc046c7eb4b..36737e997b5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java @@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record successful completion of a map attempt @@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class MapAttemptFinishedEvent implements HistoryEvent { +public class MapAttemptFinishedEvent implements HistoryEvent { private MapAttemptFinished datum = null; @@ -218,4 +221,23 @@ public class MapAttemptFinishedEvent implements HistoryEvent { return physMemKbytes; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + return tEvent; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java index eead9cf03df..95a2e36ba37 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java @@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record the normalized map/reduce requirements. @@ -71,4 +73,13 @@ public class NormalizedResourceEvent implements HistoryEvent { public void setDatum(Object datum) { throw new UnsupportedOperationException("Not a seriable object"); } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("MEMORY", "" + getMemory()); + tEvent.addInfo("TASK_TYPE", getTaskType()); + return tEvent; + } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java index 6644a48f1e8..6087c7a1a16 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java @@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record successful completion of a reduce attempt @@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class ReduceAttemptFinishedEvent implements HistoryEvent { +public class ReduceAttemptFinishedEvent implements HistoryEvent { private ReduceAttemptFinished datum = null; @@ -222,5 +225,25 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { public int[] getPhysMemKbytes() { return physMemKbytes; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime()); + tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java index bb7dbe00fd8..c7c43876320 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java @@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record successful task completion @@ -135,5 +138,21 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { ? EventType.MAP_ATTEMPT_FINISHED : EventType.REDUCE_ATTEMPT_FINISHED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ? + "" : getAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("STATE", getState()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + tEvent.addInfo("HOSTNAME", getHostname()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java index 3073d5b95f1..61c5178f9a9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java @@ -23,8 +23,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -132,5 +134,21 @@ public class TaskAttemptStartedEvent implements HistoryEvent { } return null; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("TASK_ATTEMPT_ID", + getTaskAttemptId().toString()); + tEvent.addInfo("START_TIME", getStartTime()); + tEvent.addInfo("HTTP_PORT", getHttpPort()); + tEvent.addInfo("TRACKER_NAME", getTrackerName()); + tEvent.addInfo("SHUFFLE_PORT", getShufflePort()); + tEvent.addInfo("CONTAINER_ID", getContainerId() == null ? + "" : getContainerId().toString()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java index 77ee2a031ad..0bb13580638 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java @@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.mapred.ProgressSplitsBlock; @@ -247,5 +250,26 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { public int[] getPhysMemKbytes() { return physMemKbytes; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("TASK_ATTEMPT_ID", getTaskAttemptId() == null ? + "" : getTaskAttemptId().toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("ERROR", getError()); + tEvent.addInfo("STATUS", getTaskStatus()); + tEvent.addInfo("HOSTNAME", getHostname()); + tEvent.addInfo("PORT", getPort()); + tEvent.addInfo("RACK_NAME", getRackName()); + tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime()); + tEvent.addInfo("SORT_FINISH_TIME", getFinishTime()); + tEvent.addInfo("MAP_FINISH_TIME", getFinishTime()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java index 2838f08cd3c..5e82dea2dbc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java @@ -20,10 +20,14 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -136,5 +140,20 @@ public class TaskFailedEvent implements HistoryEvent { public EventType getEventType() { return EventType.TASK_FAILED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("STATUS", TaskStatus.State.FAILED.toString()); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("ERROR", getError()); + tEvent.addInfo("FAILED_ATTEMPT_ID", + getFailedAttemptID() == null ? "" : getFailedAttemptID().toString()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java index d4ec74db0ad..e359e32e70b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java @@ -21,10 +21,14 @@ package org.apache.hadoop.mapreduce.jobhistory; import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record the successful completion of a task @@ -115,5 +119,19 @@ public class TaskFinishedEvent implements HistoryEvent { return EventType.TASK_FINISHED; } + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("COUNTERS_GROUPS", + JobHistoryEventUtils.countersToJSON(getCounters())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); + tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID", + getSuccessfulTaskAttemptId() == null ? "" : + getSuccessfulTaskAttemptId().toString()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java index ed53b030289..d1b97bf5b6a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java @@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; /** * Event to record the start of a task @@ -71,5 +73,15 @@ public class TaskStartedEvent implements HistoryEvent { public EventType getEventType() { return EventType.TASK_STARTED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("TASK_TYPE", getTaskType().toString()); + tEvent.addInfo("START_TIME", getStartTime()); + tEvent.addInfo("SPLIT_LOCATIONS", getSplitLocations()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java index 58f414330d8..b9a389cf6ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.avro.util.Utf8; @@ -60,5 +62,13 @@ public class TaskUpdatedEvent implements HistoryEvent { public EventType getEventType() { return EventType.TASK_UPDATED; } + + @Override + public TimelineEvent toTimelineEvent() { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(StringUtils.toUpperCase(getEventType().name())); + tEvent.addInfo("FINISH_TIME", getFinishTime()); + return tEvent; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java new file mode 100644 index 00000000000..f4896ff1e10 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java @@ -0,0 +1,51 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.util; + +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; + +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.ObjectNode; + +public class JobHistoryEventUtils { + + public static JsonNode countersToJSON(Counters counters) { + ObjectMapper mapper = new ObjectMapper(); + ArrayNode nodes = mapper.createArrayNode(); + if (counters != null) { + for (CounterGroup counterGroup : counters) { + ObjectNode groupNode = nodes.addObject(); + groupNode.put("NAME", counterGroup.getName()); + groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName()); + ArrayNode countersNode = groupNode.putArray("COUNTERS"); + for (Counter counter : counterGroup) { + ObjectNode counterNode = countersNode.addObject(); + counterNode.put("NAME", counter.getName()); + counterNode.put("DISPLAY_NAME", counter.getDisplayName()); + counterNode.put("VALUE", counter.getValue()); + } + } + } + return nodes; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index ebc43aacf0e..ef7ec36870b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -637,6 +637,13 @@ + + mapreduce.job.new-timeline-service.enabled + false + Specifies if posting job and task events to new timeline service. + + + mapreduce.input.fileinputformat.split.minsize 0 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index 40ed9ad1160..afc4686fd3a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -18,23 +18,46 @@ package org.apache.hadoop.mapred; +import java.io.File; +import java.io.IOException; + +import java.util.EnumSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.timeline.TimelineStore; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Assert; import org.junit.Test; public class TestMRTimelineEventHandling { + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; + private static final Log LOG = + LogFactory.getLog(TestMRTimelineEventHandling.class); + @Test public void testTimelineServiceStartInMiniCluster() throws Exception { Configuration conf = new YarnConfiguration(); @@ -48,7 +71,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); @@ -89,7 +112,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, @@ -137,6 +160,140 @@ public class TestMRTimelineEventHandling { } } } + + @Test + public void testMRNewTimelineServiceEventHandling() throws Exception { + LOG.info("testMRNewTimelineServiceEventHandling start."); + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true); + + // enable new timeline serivce in MR side + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true); + + // enable aux-service based timeline collectors + conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); + conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); + + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + + MiniMRYarnCluster cluster = null; + try { + cluster = new MiniMRYarnCluster( + TestMRTimelineEventHandling.class.getSimpleName(), 1, true); + cluster.init(conf); + cluster.start(); + LOG.info("A MiniMRYarnCluster get start."); + + Path inDir = new Path("input"); + Path outDir = new Path("output"); + LOG.info("Run 1st job which should be successful."); + RunningJob job = + UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.SUCCEEDED, + job.getJobStatus().getState().getValue()); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(cluster.getConfig())); + yarnClient.start(); + EnumSet appStates = + EnumSet.allOf(YarnApplicationState.class); + + ApplicationId firstAppId = null; + List apps = yarnClient.getApplications(appStates); + Assert.assertEquals(apps.size(), 1); + ApplicationReport appReport = apps.get(0); + firstAppId = appReport.getApplicationId(); + + checkNewTimelineEvent(firstAppId); + + LOG.info("Run 2nd job which should be failed."); + job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.FAILED, + job.getJobStatus().getState().getValue()); + + apps = yarnClient.getApplications(appStates); + Assert.assertEquals(apps.size(), 2); + + ApplicationId secAppId = null; + secAppId = apps.get(0).getApplicationId() == firstAppId ? + apps.get(1).getApplicationId() : apps.get(0).getApplicationId(); + checkNewTimelineEvent(firstAppId); + + } finally { + if (cluster != null) { + cluster.stop(); + } + // Cleanup test file + String testRoot = + FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + File testRootFolder = new File(testRoot); + if(testRootFolder.isDirectory()) { + FileUtils.deleteDirectory(testRootFolder); + } + + } + } + + private void checkNewTimelineEvent(ApplicationId appId) throws IOException { + String tmpRoot = + FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + + "/entities/"; + + File tmpRootFolder = new File(tmpRoot); + + Assert.assertTrue(tmpRootFolder.isDirectory()); + String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + UserGroupInformation.getCurrentUser().getShortUserName() + + "/" + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + + "/1/1/" + appId.toString(); + // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs + String outputDirJob = basePath + "/MAPREDUCE_JOB/"; + + File entityFolder = new File(outputDirJob); + Assert.assertTrue("Job output directory: " + outputDirJob + " is not exist.", + entityFolder.isDirectory()); + + // check for job event file + String jobEventFileName = appId.toString().replaceAll("application", "job") + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String jobEventFilePath = outputDirJob + jobEventFileName; + File jobEventFile = new File(jobEventFilePath); + Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " is not exist.", + jobEventFile.exists()); + + // check for task event file + String outputDirTask = basePath + "/MAPREDUCE_TASK/"; + File taskFolder = new File(outputDirTask); + Assert.assertTrue("Task output directory: " + outputDirTask + " is not exist.", + taskFolder.isDirectory()); + + String taskEventFileName = appId.toString().replaceAll("application", "task") + + "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String taskEventFilePath = outputDirTask + taskEventFileName; + File taskEventFile = new File(taskEventFilePath); + Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " is not exist.", + taskEventFile.exists()); + + // check for task attempt event file + String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/"; + File taskAttemptFolder = new File(outputDirTaskAttempt); + Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + + " is not exist.", taskAttemptFolder.isDirectory()); + + String taskAttemptEventFileName = appId.toString().replaceAll( + "application", "attempt") + "_m_000000_0" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String taskAttemptEventFilePath = outputDirTaskAttempt + + taskAttemptEventFileName; + File taskAttemptEventFile = new File(taskAttemptEventFilePath); + Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath + + " is not exist.", taskAttemptEventFile.exists()); + } @Test public void testMapreduceJobTimelineServiceEnabled() @@ -147,7 +304,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index cad6f3ac8a7..18a4c1431a3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -66,6 +66,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster { private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class); private JobHistoryServer historyServer; private JobHistoryServerWrapper historyServerWrapper; + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; public MiniMRYarnCluster(String testName) { this(testName, 1); @@ -167,8 +168,24 @@ public class MiniMRYarnCluster extends MiniYARNCluster { conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of // which shuffle doesn't happen //configure the shuffle service in NM - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); + String[] nmAuxServices = conf.getStrings(YarnConfiguration.NM_AUX_SERVICES); + // if need to enable TIMELINE_AUX_SERVICE_NAME + boolean enableTimelineAuxService = false; + if (nmAuxServices != null) { + for (String nmAuxService: nmAuxServices) { + if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) { + enableTimelineAuxService = true; + break; + } + } + } + if (enableTimelineAuxService) { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, TIMELINE_AUX_SERVICE_NAME }); + } else { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); + } conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, Service.class);