From 1bef05715d51c168c6fa838fda7f1359323849bc Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 25 Oct 2011 06:27:24 +0000 Subject: [PATCH] Merge -c 1188528 from trunk to branch-0.23 to complete fix for MAPREDUCE-2821. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1188529 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../jobhistory/JobHistoryEventHandler.java | 41 +++++-- .../mapreduce/jobhistory/JobSummary.java | 45 ++++---- .../v2/app/job/impl/TaskAttemptImpl.java | 9 +- .../v2/app/rm/RMContainerAllocator.java | 44 ++++++-- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 13 +++ .../src/main/avro/Events.avpr | 3 +- .../mapreduce/jobhistory/HistoryEvent.java | 2 - .../JobUnsuccessfulCompletionEvent.java | 5 +- .../jobhistory/NormalizedResourceEvent.java | 74 ++++++++++++ .../ReduceAttemptFinishedEvent.java | 8 +- .../jobhistory/TaskStartedEvent.java | 5 +- .../v2/hs/TestJobHistoryParsing.java | 105 +++++++++--------- .../jobhistory/TestJobHistoryEvents.java | 5 +- 14 files changed, 240 insertions(+), 122 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 6f43ea54c11..0bccbbf1f39 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1703,6 +1703,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2746. Yarn servers can't communicate with each other with hadoop.security.authorization set to true (acmurthy via mahadev) + MAPREDUCE-2821. Added missing fields (resourcePerMap & resourcePerReduce) + to JobSummary logs. (mahadev via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 4b97feda2d4..07f8ecc51fd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -91,7 +92,8 @@ public JobHistoryEventHandler(AppContext context, int startCount) { } /* (non-Javadoc) - * @see org.apache.hadoop.yarn.service.AbstractService#init(org.apache.hadoop.conf.Configuration) + * @see org.apache.hadoop.yarn.service.AbstractService#init(org. + * apache.hadoop.conf.Configuration) * Initializes the FileSystem and Path objects for the log and done directories. * Creates these directories if they do not already exist. */ @@ -155,14 +157,15 @@ public void init(Configuration conf) { + doneDirPath + "] based on conf: " + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR - + ". Either set to true or pre-create this directory with appropriate permissions"; + + ". Either set to true or pre-create this directory with" + + " appropriate permissions"; LOG.error(message); throw new YarnException(message); } } } catch (IOException e) { - LOG.error("Failed checking for the existance of history intermediate done directory: [" - + doneDirPath + "]"); + LOG.error("Failed checking for the existance of history intermediate " + + "done directory: [" + doneDirPath + "]"); throw new YarnException(e); } @@ -380,8 +383,11 @@ protected void handleEvent(JobHistoryEvent event) { MetaInfo mi = fileMap.get(event.getJobID()); try { HistoryEvent historyEvent = event.getHistoryEvent(); - mi.writeEvent(historyEvent); - processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); + if (! (historyEvent instanceof NormalizedResourceEvent)) { + mi.writeEvent(historyEvent); + } + processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), + event.getJobID()); LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType()); } catch (IOException e) { @@ -395,7 +401,7 @@ protected void handleEvent(JobHistoryEvent event) { (JobSubmittedEvent) event.getHistoryEvent(); mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime()); } - + // If this is JobFinishedEvent, close the writer and setup the job-index if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) { try { @@ -415,7 +421,8 @@ protected void handleEvent(JobHistoryEvent event) { if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) { try { - JobUnsuccessfulCompletionEvent jucEvent = (JobUnsuccessfulCompletionEvent) event + JobUnsuccessfulCompletionEvent jucEvent = + (JobUnsuccessfulCompletionEvent) event .getHistoryEvent(); mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); @@ -429,7 +436,8 @@ protected void handleEvent(JobHistoryEvent event) { } } - private void processEventForJobSummary(HistoryEvent event, JobSummary summary, JobId jobId) { + public void processEventForJobSummary(HistoryEvent event, JobSummary summary, + JobId jobId) { // context.getJob could be used for some of this info as well. switch (event.getEventType()) { case JOB_SUBMITTED: @@ -438,6 +446,15 @@ private void processEventForJobSummary(HistoryEvent event, JobSummary summary, J summary.setQueue(jse.getJobQueueName()); summary.setJobSubmitTime(jse.getSubmitTime()); break; + case NORMALIZED_RESOURCE: + NormalizedResourceEvent normalizedResourceEvent = + (NormalizedResourceEvent) event; + if (normalizedResourceEvent.getTaskType() == TaskType.MAP) { + summary.setResourcesPerMap(normalizedResourceEvent.getMemory()); + } else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) { + summary.setResourcesPerReduce(normalizedResourceEvent.getMemory()); + } + break; case JOB_INITED: JobInitedEvent jie = (JobInitedEvent) event; summary.setJobLaunchTime(jie.getLaunchTime()); @@ -503,7 +520,8 @@ protected void closeEventWriter(JobId jobId) throws IOException { if (!mi.isWriterActive()) { throw new IOException( - "Inactive Writer: Likely received multiple JobFinished / JobUnsuccessful events for JobId: [" + "Inactive Writer: Likely received multiple JobFinished / " + + "JobUnsuccessful events for JobId: [" + jobId + "]"); } @@ -594,7 +612,8 @@ private class MetaInfo { this.historyFile = historyFile; this.confFile = conf; this.writer = writer; - this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null); + this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, + null); this.jobSummary = new JobSummary(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java index b5c251072fc..691c7ee4e13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java @@ -34,7 +34,8 @@ public class JobSummary { private int numFailedMaps; private int numFinishedReduces; private int numFailedReduces; - // private int numSlotsPerMap; | Doesn't make sense with potentially different + private int resourcesPerMap; // resources used per map/min resource + private int resourcesPerReduce; // resources used per reduce/min resource // resource models // private int numSlotsPerReduce; | Doesn't make sense with potentially // different resource models @@ -112,14 +113,14 @@ public void setNumFailedMaps(int numFailedMaps) { this.numFailedMaps = numFailedMaps; } - // public int getNumSlotsPerMap() { - // return numSlotsPerMap; - // } - // - // public void setNumSlotsPerMap(int numSlotsPerMap) { - // this.numSlotsPerMap = numSlotsPerMap; - // } - + public int getResourcesPerMap() { + return resourcesPerMap; + } + + public void setResourcesPerMap(int resourcesPerMap) { + this.resourcesPerMap = resourcesPerMap; + } + public int getNumFinishedReduces() { return numFinishedReduces; } @@ -136,14 +137,14 @@ public void setNumFailedReduces(int numFailedReduces) { this.numFailedReduces = numFailedReduces; } - // public int getNumSlotsPerReduce() { - // return numSlotsPerReduce; - // } - // - // public void setNumSlotsPerReduce(int numSlotsPerReduce) { - // this.numSlotsPerReduce = numSlotsPerReduce; - // } - + public int getResourcesPerReduce() { + return this.resourcesPerReduce; + } + + public void setResourcesPerReduce(int resourcesPerReduce) { + this.resourcesPerReduce = resourcesPerReduce; + } + public String getUser() { return user; } @@ -184,14 +185,6 @@ public void setReduceSlotSeconds(long reduceSlotSeconds) { this.reduceSlotSeconds = reduceSlotSeconds; } - // public int getClusterSlotCapacity() { - // return clusterSlotCapacity; - // } - // - // public void setClusterSlotCapacity(int clusterSlotCapacity) { - // this.clusterSlotCapacity = clusterSlotCapacity; - // } - public String getJobSummaryString() { SummaryBuilder summary = new SummaryBuilder() .add("jobId", jobId) @@ -200,6 +193,8 @@ public String getJobSummaryString() { .add("firstMapTaskLaunchTime", firstMapTaskLaunchTime) .add("firstReduceTaskLaunchTime", firstReduceTaskLaunchTime) .add("finishTime", jobFinishTime) + .add("resourcesPerMap", resourcesPerMap) + .add("resourcesPerReduce", resourcesPerReduce) .add("numMaps", numFinishedMaps + numFailedMaps) .add("numReduces", numFinishedReduces + numFailedReduces) .add("user", user) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index c00be48dc30..0b3187de720 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -91,12 +91,12 @@ import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; import org.apache.hadoop.mapreduce.v2.util.MRApps; -import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -115,10 +115,10 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.hadoop.util.StringUtils; /** @@ -856,7 +856,7 @@ private void setFinishTime() { private static long computeSlotMillis(TaskAttemptImpl taskAttempt) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); int slotMemoryReq = - taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); + taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); int simSlotsRequired = slotMemoryReq / (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT @@ -994,7 +994,7 @@ public void transition(TaskAttemptImpl taskAttempt, private static class ContainerAssignedTransition implements SingleArcTransition { - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { @@ -1164,6 +1164,7 @@ private static class TaskCleanupTransition implements @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + @SuppressWarnings("deprecation") TaskAttemptContext taskContext = new TaskAttemptContextImpl(new JobConf(taskAttempt.conf), TypeConverter.fromYarn(taskAttempt.attemptId)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index cc916a9b514..9fa08e5b64d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -18,8 +18,6 @@ package org.apache.hadoop.mapreduce.v2.app.rm; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -37,7 +35,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -125,7 +128,7 @@ added to the pending and are ramped up (added to scheduled) based private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; private float reduceSlowStart = 0; - + public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); } @@ -169,6 +172,7 @@ public void stop() { LOG.info("Final Stats: " + getStat()); } + @SuppressWarnings("unchecked") @Override public synchronized void handle(ContainerAllocatorEvent event) { LOG.info("Processing the event " + event.toString()); @@ -179,7 +183,13 @@ public synchronized void handle(ContainerAllocatorEvent event) { if (mapResourceReqt == 0) { mapResourceReqt = reqEvent.getCapability().getMemory(); int minSlotMemSize = getMinContainerCapability().getMemory(); - mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) * minSlotMemSize; + mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) + * minSlotMemSize; + JobID id = TypeConverter.fromYarn(applicationId); + JobId jobId = TypeConverter.toYarn(id); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, + mapResourceReqt))); LOG.info("mapResourceReqt:"+mapResourceReqt); if (mapResourceReqt > getMaxContainerCapability().getMemory()) { String diagMsg = "MAP capability required is more than the supported " + @@ -199,12 +209,20 @@ public synchronized void handle(ContainerAllocatorEvent event) { reduceResourceReqt = reqEvent.getCapability().getMemory(); int minSlotMemSize = getMinContainerCapability().getMemory(); //round off on slotsize - reduceResourceReqt = (int) Math.ceil((float) reduceResourceReqt/minSlotMemSize) * minSlotMemSize; + reduceResourceReqt = (int) Math.ceil((float) + reduceResourceReqt/minSlotMemSize) * minSlotMemSize; + JobID id = TypeConverter.fromYarn(applicationId); + JobId jobId = TypeConverter.toYarn(id); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.REDUCE, + reduceResourceReqt))); LOG.info("reduceResourceReqt:"+reduceResourceReqt); if (reduceResourceReqt > getMaxContainerCapability().getMemory()) { - String diagMsg = "REDUCE capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. reduceResourceReqt: " + - reduceResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory(); + String diagMsg = "REDUCE capability required is more than the " + + "supported max container capability in the cluster. Killing the " + + "Job. reduceResourceReqt: " + reduceResourceReqt + + " maxContainerCapability:" + getMaxContainerCapability().getMemory(); LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( getJob().getID(), diagMsg)); @@ -217,7 +235,8 @@ public synchronized void handle(ContainerAllocatorEvent event) { //add to the front of queue for fail fast pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); } else { - pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));//reduces are added to pending and are slowly ramped up + pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); + //reduces are added to pending and are slowly ramped up } } @@ -411,6 +430,7 @@ private synchronized String getStat() { " availableResources(headroom):" + getAvailableResources(); } + @SuppressWarnings("unchecked") private List getResources() throws Exception { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null AMResponse response = makeRemoteRequest(); @@ -538,6 +558,7 @@ void addReduce(ContainerRequest req) { addContainerReq(req); } + @SuppressWarnings("unchecked") private void assign(List allocatedContainers) { Iterator it = allocatedContainers.iterator(); LOG.info("Got allocated containers " + allocatedContainers.size()); @@ -694,6 +715,7 @@ else if (PRIORITY_REDUCE.equals(priority)) { } + @SuppressWarnings("unchecked") private ContainerRequest assignToFailedMap(Container allocated) { //try to assign to earlierFailedMaps if present ContainerRequest assigned = null; @@ -723,6 +745,7 @@ private ContainerRequest assignToReduce(Container allocated) { return assigned; } + @SuppressWarnings("unchecked") private ContainerRequest assignToMap(Container allocated) { //try to assign to maps if present //first by host, then by rack, followed by * @@ -798,7 +821,8 @@ void add(ContainerId containerId, TaskAttemptId tId) { } void preemptReduce(int toPreempt) { - List reduceList = new ArrayList(reduces.keySet()); + List reduceList = new ArrayList + (reduces.keySet()); //sort reduces on progress Collections.sort(reduceList, new Comparator() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 0d5097ae28d..e948f56cf5d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -31,9 +31,12 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.WrappedJvmID; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -360,6 +363,16 @@ public void handle(ContainerAllocatorEvent event) { NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234); Container container = BuilderUtils.newContainer(cId, nodeId, "localhost:9999", null, null, null); + JobID id = TypeConverter.fromYarn(applicationId); + JobId jobId = TypeConverter.toYarn(id); + getContext().getEventHandler().handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.REDUCE, + 100))); + getContext().getEventHandler().handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.MAP, + 100))); getContext().getEventHandler().handle( new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr index e05e0c900e3..f826de03af0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr @@ -225,7 +225,7 @@ {"name": "counters", "type": "JhCounters"} ] }, - + {"type": "record", "name": "TaskStarted", "fields": [ {"name": "taskid", "type": "string"}, @@ -256,6 +256,7 @@ "TASK_FINISHED", "TASK_FAILED", "TASK_UPDATED", + "NORMALIZED_RESOURCE", "MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_FINISHED", "MAP_ATTEMPT_FAILED", diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java index 2dda8f70647..a30748cd651 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java @@ -18,8 +18,6 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java index aded4e966a5..a1c374f522b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java @@ -18,14 +18,11 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; -import org.apache.avro.util.Utf8; - /** * Event to record Failed and Killed completion of jobs * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java new file mode 100644 index 00000000000..b8f049c0775 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.jobhistory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.TaskType; + +/** + * Event to record the normalized map/reduce requirements. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class NormalizedResourceEvent implements HistoryEvent { + private int memory; + private TaskType taskType; + + /** + * Normalized request when sent to the Resource Manager. + * @param taskType the tasktype of the request. + * @param memory the normalized memory requirements. + */ + public NormalizedResourceEvent(TaskType taskType, int memory) { + this.memory = memory; + this.taskType = taskType; + } + + /** + * the tasktype for the event. + * @return the tasktype for the event. + */ + public TaskType getTaskType() { + return this.taskType; + } + + /** + * the normalized memory + * @return the normalized memory + */ + public int getMemory() { + return this.memory; + } + + @Override + public EventType getEventType() { + return EventType.NORMALIZED_RESOURCE; + } + + @Override + public Object getDatum() { + throw new UnsupportedOperationException("Not a seriable object"); + } + + @Override + public void setDatum(Object datum) { + throw new UnsupportedOperationException("Not a seriable object"); + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java index fb20a2edc37..f269769edf2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java @@ -18,19 +18,15 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.ProgressSplitsBlock; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapred.ProgressSplitsBlock; - -import org.apache.avro.util.Utf8; - /** * Event to record successful completion of a reduce attempt * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java index 12639975e75..4c2b132b1c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java @@ -18,15 +18,12 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.avro.util.Utf8; - /** * Event to record the start of a task * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index cf4b51b5655..b4036cbdfa7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.mapreduce.v2.hs; @@ -54,27 +54,32 @@ public class TestJobHistoryParsing { private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); + @Test public void testHistoryParsing() throws Exception { Configuration conf = new Configuration(); long amStartTimeEst = System.currentTimeMillis(); - MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true); + MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), + true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); app.waitForState(job, JobState.SUCCEEDED); - - //make sure all events are flushed + + // make sure all events are flushed app.waitForState(Service.STATE.STOPPED); - - String jobhistoryDir = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf); + + String jobhistoryDir = JobHistoryUtils + .getHistoryIntermediateDoneDirForUser(conf); JobHistory jobHistory = new JobHistory(); jobHistory.init(conf); - - JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId).getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils.getDoneFileName(jobIndexInfo); - + + JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId) + .getJobIndexInfo(); + String jobhistoryFileName = FileNameIndexUtils + .getDoneFileName(jobIndexInfo); + Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); FSDataInputStream in = null; LOG.info("JobHistoryFile is: " + historyFilePath); @@ -86,27 +91,24 @@ public void testHistoryParsing() throws Exception { LOG.info("Can not open history file: " + historyFilePath, ioe); throw (new Exception("Can not open History File")); } - + JobHistoryParser parser = new JobHistoryParser(in); JobInfo jobInfo = parser.parse(); - - Assert.assertEquals ("Incorrect username ", - "mapred", jobInfo.getUsername()); - Assert.assertEquals("Incorrect jobName ", - "test", jobInfo.getJobname()); - Assert.assertEquals("Incorrect queuename ", - "default", jobInfo.getJobQueueName()); - Assert.assertEquals("incorrect conf path", - "test", jobInfo.getJobConfPath()); - Assert.assertEquals("incorrect finishedMap ", - 2, jobInfo.getFinishedMaps()); - Assert.assertEquals("incorrect finishedReduces ", - 1, jobInfo.getFinishedReduces()); - Assert.assertEquals("incorrect uberized ", - job.isUber(), jobInfo.getUberized()); + + Assert.assertEquals("Incorrect username ", "mapred", jobInfo.getUsername()); + Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); + Assert.assertEquals("Incorrect queuename ", "default", + jobInfo.getJobQueueName()); + Assert + .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); + Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps()); + Assert.assertEquals("incorrect finishedReduces ", 1, + jobInfo.getFinishedReduces()); + Assert.assertEquals("incorrect uberized ", job.isUber(), + jobInfo.getUberized()); int totalTasks = jobInfo.getAllTasks().size(); Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); - + // Verify aminfo Assert.assertEquals(1, jobInfo.getAMInfos().size()); Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0) @@ -120,15 +122,15 @@ public void testHistoryParsing() throws Exception { && amInfo.getStartTime() >= amStartTimeEst); ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1); - //Assert at taskAttempt level + // Assert at taskAttempt level for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) { int taskAttemptCount = taskInfo.getAllTaskAttempts().size(); - Assert.assertEquals("total number of task attempts ", - 1, taskAttemptCount); - TaskAttemptInfo taInfo = - taskInfo.getAllTaskAttempts().values().iterator().next(); + Assert + .assertEquals("total number of task attempts ", 1, taskAttemptCount); + TaskAttemptInfo taInfo = taskInfo.getAllTaskAttempts().values() + .iterator().next(); Assert.assertNotNull(taInfo.getContainerId()); - //Verify the wrong ctor is not being used. Remove after mrv1 is removed. + // Verify the wrong ctor is not being used. Remove after mrv1 is removed. Assert.assertFalse(taInfo.getContainerId().equals(fakeCid)); } @@ -138,9 +140,8 @@ public void testHistoryParsing() throws Exception { TypeConverter.fromYarn(task.getID())); Assert.assertNotNull("TaskInfo not found", taskInfo); for (TaskAttempt taskAttempt : task.getAttempts().values()) { - TaskAttemptInfo taskAttemptInfo = - taskInfo.getAllTaskAttempts().get( - TypeConverter.fromYarn((taskAttempt.getID()))); + TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( + TypeConverter.fromYarn((taskAttempt.getID()))); Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo); Assert.assertEquals("Incorrect shuffle port for task attempt", taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort()); @@ -151,6 +152,8 @@ public void testHistoryParsing() throws Exception { .getIntermediateSummaryFileName(jobId); Path summaryFile = new Path(jobhistoryDir, summaryFileName); String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile); + Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); + Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); Assert.assertNotNull(jobSummaryString); Map jobSummaryElements = new HashMap(); diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java index 31bd95b1cc2..09051795af0 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java @@ -17,16 +17,13 @@ */ package org.apache.hadoop.mapreduce.jobhistory; -import java.util.List; -import java.util.ArrayList; +import junit.framework.TestCase; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; -import junit.framework.TestCase; - /** * Test various jobhistory events */