Merge -c 1188528 from trunk to branch-0.23 to complete fix for MAPREDUCE-2821.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1188529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-25 06:27:24 +00:00
parent ddccefbefb
commit 1bef05715d
14 changed files with 240 additions and 122 deletions

View File

@ -1703,6 +1703,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2746. Yarn servers can't communicate with each other with MAPREDUCE-2746. Yarn servers can't communicate with each other with
hadoop.security.authorization set to true (acmurthy via mahadev) hadoop.security.authorization set to true (acmurthy via mahadev)
MAPREDUCE-2821. Added missing fields (resourcePerMap & resourcePerReduce)
to JobSummary logs. (mahadev via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -91,7 +92,8 @@ public class JobHistoryEventHandler extends AbstractService
} }
/* (non-Javadoc) /* (non-Javadoc)
* @see org.apache.hadoop.yarn.service.AbstractService#init(org.apache.hadoop.conf.Configuration) * @see org.apache.hadoop.yarn.service.AbstractService#init(org.
* apache.hadoop.conf.Configuration)
* Initializes the FileSystem and Path objects for the log and done directories. * Initializes the FileSystem and Path objects for the log and done directories.
* Creates these directories if they do not already exist. * Creates these directories if they do not already exist.
*/ */
@ -155,14 +157,15 @@ public class JobHistoryEventHandler extends AbstractService
+ doneDirPath + doneDirPath
+ "] based on conf: " + "] based on conf: "
+ MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR
+ ". Either set to true or pre-create this directory with appropriate permissions"; + ". Either set to true or pre-create this directory with" +
" appropriate permissions";
LOG.error(message); LOG.error(message);
throw new YarnException(message); throw new YarnException(message);
} }
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed checking for the existance of history intermediate done directory: [" LOG.error("Failed checking for the existance of history intermediate " +
+ doneDirPath + "]"); "done directory: [" + doneDirPath + "]");
throw new YarnException(e); throw new YarnException(e);
} }
@ -380,8 +383,11 @@ public class JobHistoryEventHandler extends AbstractService
MetaInfo mi = fileMap.get(event.getJobID()); MetaInfo mi = fileMap.get(event.getJobID());
try { try {
HistoryEvent historyEvent = event.getHistoryEvent(); HistoryEvent historyEvent = event.getHistoryEvent();
if (! (historyEvent instanceof NormalizedResourceEvent)) {
mi.writeEvent(historyEvent); mi.writeEvent(historyEvent);
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); }
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
LOG.info("In HistoryEventHandler " LOG.info("In HistoryEventHandler "
+ event.getHistoryEvent().getEventType()); + event.getHistoryEvent().getEventType());
} catch (IOException e) { } catch (IOException e) {
@ -415,7 +421,8 @@ public class JobHistoryEventHandler extends AbstractService
if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
|| event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) { || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
try { try {
JobUnsuccessfulCompletionEvent jucEvent = (JobUnsuccessfulCompletionEvent) event JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event
.getHistoryEvent(); .getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
@ -429,7 +436,8 @@ public class JobHistoryEventHandler extends AbstractService
} }
} }
private void processEventForJobSummary(HistoryEvent event, JobSummary summary, JobId jobId) { public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
JobId jobId) {
// context.getJob could be used for some of this info as well. // context.getJob could be used for some of this info as well.
switch (event.getEventType()) { switch (event.getEventType()) {
case JOB_SUBMITTED: case JOB_SUBMITTED:
@ -438,6 +446,15 @@ public class JobHistoryEventHandler extends AbstractService
summary.setQueue(jse.getJobQueueName()); summary.setQueue(jse.getJobQueueName());
summary.setJobSubmitTime(jse.getSubmitTime()); summary.setJobSubmitTime(jse.getSubmitTime());
break; break;
case NORMALIZED_RESOURCE:
NormalizedResourceEvent normalizedResourceEvent =
(NormalizedResourceEvent) event;
if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
} else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
}
break;
case JOB_INITED: case JOB_INITED:
JobInitedEvent jie = (JobInitedEvent) event; JobInitedEvent jie = (JobInitedEvent) event;
summary.setJobLaunchTime(jie.getLaunchTime()); summary.setJobLaunchTime(jie.getLaunchTime());
@ -503,7 +520,8 @@ public class JobHistoryEventHandler extends AbstractService
if (!mi.isWriterActive()) { if (!mi.isWriterActive()) {
throw new IOException( throw new IOException(
"Inactive Writer: Likely received multiple JobFinished / JobUnsuccessful events for JobId: [" "Inactive Writer: Likely received multiple JobFinished / " +
"JobUnsuccessful events for JobId: ["
+ jobId + "]"); + jobId + "]");
} }
@ -594,7 +612,8 @@ public class JobHistoryEventHandler extends AbstractService
this.historyFile = historyFile; this.historyFile = historyFile;
this.confFile = conf; this.confFile = conf;
this.writer = writer; this.writer = writer;
this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null); this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1,
null);
this.jobSummary = new JobSummary(); this.jobSummary = new JobSummary();
} }

View File

@ -34,7 +34,8 @@ public class JobSummary {
private int numFailedMaps; private int numFailedMaps;
private int numFinishedReduces; private int numFinishedReduces;
private int numFailedReduces; private int numFailedReduces;
// private int numSlotsPerMap; | Doesn't make sense with potentially different private int resourcesPerMap; // resources used per map/min resource
private int resourcesPerReduce; // resources used per reduce/min resource
// resource models // resource models
// private int numSlotsPerReduce; | Doesn't make sense with potentially // private int numSlotsPerReduce; | Doesn't make sense with potentially
// different resource models // different resource models
@ -112,13 +113,13 @@ public class JobSummary {
this.numFailedMaps = numFailedMaps; this.numFailedMaps = numFailedMaps;
} }
// public int getNumSlotsPerMap() { public int getResourcesPerMap() {
// return numSlotsPerMap; return resourcesPerMap;
// } }
//
// public void setNumSlotsPerMap(int numSlotsPerMap) { public void setResourcesPerMap(int resourcesPerMap) {
// this.numSlotsPerMap = numSlotsPerMap; this.resourcesPerMap = resourcesPerMap;
// } }
public int getNumFinishedReduces() { public int getNumFinishedReduces() {
return numFinishedReduces; return numFinishedReduces;
@ -136,13 +137,13 @@ public class JobSummary {
this.numFailedReduces = numFailedReduces; this.numFailedReduces = numFailedReduces;
} }
// public int getNumSlotsPerReduce() { public int getResourcesPerReduce() {
// return numSlotsPerReduce; return this.resourcesPerReduce;
// } }
//
// public void setNumSlotsPerReduce(int numSlotsPerReduce) { public void setResourcesPerReduce(int resourcesPerReduce) {
// this.numSlotsPerReduce = numSlotsPerReduce; this.resourcesPerReduce = resourcesPerReduce;
// } }
public String getUser() { public String getUser() {
return user; return user;
@ -184,14 +185,6 @@ public class JobSummary {
this.reduceSlotSeconds = reduceSlotSeconds; this.reduceSlotSeconds = reduceSlotSeconds;
} }
// public int getClusterSlotCapacity() {
// return clusterSlotCapacity;
// }
//
// public void setClusterSlotCapacity(int clusterSlotCapacity) {
// this.clusterSlotCapacity = clusterSlotCapacity;
// }
public String getJobSummaryString() { public String getJobSummaryString() {
SummaryBuilder summary = new SummaryBuilder() SummaryBuilder summary = new SummaryBuilder()
.add("jobId", jobId) .add("jobId", jobId)
@ -200,6 +193,8 @@ public class JobSummary {
.add("firstMapTaskLaunchTime", firstMapTaskLaunchTime) .add("firstMapTaskLaunchTime", firstMapTaskLaunchTime)
.add("firstReduceTaskLaunchTime", firstReduceTaskLaunchTime) .add("firstReduceTaskLaunchTime", firstReduceTaskLaunchTime)
.add("finishTime", jobFinishTime) .add("finishTime", jobFinishTime)
.add("resourcesPerMap", resourcesPerMap)
.add("resourcesPerReduce", resourcesPerReduce)
.add("numMaps", numFinishedMaps + numFailedMaps) .add("numMaps", numFinishedMaps + numFailedMaps)
.add("numReduces", numFinishedReduces + numFailedReduces) .add("numReduces", numFinishedReduces + numFailedReduces)
.add("user", user) .add("user", user)

View File

@ -91,12 +91,12 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@ -115,10 +115,10 @@ import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.util.StringUtils;
/** /**
@ -994,7 +994,7 @@ public abstract class TaskAttemptImpl implements
private static class ContainerAssignedTransition implements private static class ContainerAssignedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked") @SuppressWarnings({ "unchecked", "deprecation" })
@Override @Override
public void transition(final TaskAttemptImpl taskAttempt, public void transition(final TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
@ -1164,6 +1164,7 @@ public abstract class TaskAttemptImpl implements
@Override @Override
public void transition(TaskAttemptImpl taskAttempt, public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
@SuppressWarnings("deprecation")
TaskAttemptContext taskContext = TaskAttemptContext taskContext =
new TaskAttemptContextImpl(new JobConf(taskAttempt.conf), new TaskAttemptContextImpl(new JobConf(taskAttempt.conf),
TypeConverter.fromYarn(taskAttempt.attemptId)); TypeConverter.fromYarn(taskAttempt.attemptId));

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app.rm; package org.apache.hadoop.mapreduce.v2.app.rm;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -37,7 +35,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -169,6 +172,7 @@ public class RMContainerAllocator extends RMContainerRequestor
LOG.info("Final Stats: " + getStat()); LOG.info("Final Stats: " + getStat());
} }
@SuppressWarnings("unchecked")
@Override @Override
public synchronized void handle(ContainerAllocatorEvent event) { public synchronized void handle(ContainerAllocatorEvent event) {
LOG.info("Processing the event " + event.toString()); LOG.info("Processing the event " + event.toString());
@ -179,7 +183,13 @@ public class RMContainerAllocator extends RMContainerRequestor
if (mapResourceReqt == 0) { if (mapResourceReqt == 0) {
mapResourceReqt = reqEvent.getCapability().getMemory(); mapResourceReqt = reqEvent.getCapability().getMemory();
int minSlotMemSize = getMinContainerCapability().getMemory(); int minSlotMemSize = getMinContainerCapability().getMemory();
mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) * minSlotMemSize; mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
* minSlotMemSize;
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
mapResourceReqt)));
LOG.info("mapResourceReqt:"+mapResourceReqt); LOG.info("mapResourceReqt:"+mapResourceReqt);
if (mapResourceReqt > getMaxContainerCapability().getMemory()) { if (mapResourceReqt > getMaxContainerCapability().getMemory()) {
String diagMsg = "MAP capability required is more than the supported " + String diagMsg = "MAP capability required is more than the supported " +
@ -199,12 +209,20 @@ public class RMContainerAllocator extends RMContainerRequestor
reduceResourceReqt = reqEvent.getCapability().getMemory(); reduceResourceReqt = reqEvent.getCapability().getMemory();
int minSlotMemSize = getMinContainerCapability().getMemory(); int minSlotMemSize = getMinContainerCapability().getMemory();
//round off on slotsize //round off on slotsize
reduceResourceReqt = (int) Math.ceil((float) reduceResourceReqt/minSlotMemSize) * minSlotMemSize; reduceResourceReqt = (int) Math.ceil((float)
reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceReqt)));
LOG.info("reduceResourceReqt:"+reduceResourceReqt); LOG.info("reduceResourceReqt:"+reduceResourceReqt);
if (reduceResourceReqt > getMaxContainerCapability().getMemory()) { if (reduceResourceReqt > getMaxContainerCapability().getMemory()) {
String diagMsg = "REDUCE capability required is more than the supported " + String diagMsg = "REDUCE capability required is more than the " +
"max container capability in the cluster. Killing the Job. reduceResourceReqt: " + "supported max container capability in the cluster. Killing the " +
reduceResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory(); "Job. reduceResourceReqt: " + reduceResourceReqt +
" maxContainerCapability:" + getMaxContainerCapability().getMemory();
LOG.info(diagMsg); LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent( eventHandler.handle(new JobDiagnosticsUpdateEvent(
getJob().getID(), diagMsg)); getJob().getID(), diagMsg));
@ -217,7 +235,8 @@ public class RMContainerAllocator extends RMContainerRequestor
//add to the front of queue for fail fast //add to the front of queue for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
} else { } else {
pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));//reduces are added to pending and are slowly ramped up pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
//reduces are added to pending and are slowly ramped up
} }
} }
@ -411,6 +430,7 @@ public class RMContainerAllocator extends RMContainerRequestor
" availableResources(headroom):" + getAvailableResources(); " availableResources(headroom):" + getAvailableResources();
} }
@SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception { private List<Container> getResources() throws Exception {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
AMResponse response = makeRemoteRequest(); AMResponse response = makeRemoteRequest();
@ -538,6 +558,7 @@ public class RMContainerAllocator extends RMContainerRequestor
addContainerReq(req); addContainerReq(req);
} }
@SuppressWarnings("unchecked")
private void assign(List<Container> allocatedContainers) { private void assign(List<Container> allocatedContainers) {
Iterator<Container> it = allocatedContainers.iterator(); Iterator<Container> it = allocatedContainers.iterator();
LOG.info("Got allocated containers " + allocatedContainers.size()); LOG.info("Got allocated containers " + allocatedContainers.size());
@ -694,6 +715,7 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
@SuppressWarnings("unchecked")
private ContainerRequest assignToFailedMap(Container allocated) { private ContainerRequest assignToFailedMap(Container allocated) {
//try to assign to earlierFailedMaps if present //try to assign to earlierFailedMaps if present
ContainerRequest assigned = null; ContainerRequest assigned = null;
@ -723,6 +745,7 @@ public class RMContainerAllocator extends RMContainerRequestor
return assigned; return assigned;
} }
@SuppressWarnings("unchecked")
private ContainerRequest assignToMap(Container allocated) { private ContainerRequest assignToMap(Container allocated) {
//try to assign to maps if present //try to assign to maps if present
//first by host, then by rack, followed by * //first by host, then by rack, followed by *
@ -798,7 +821,8 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
void preemptReduce(int toPreempt) { void preemptReduce(int toPreempt) {
List<TaskAttemptId> reduceList = new ArrayList(reduces.keySet()); List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
(reduces.keySet());
//sort reduces on progress //sort reduces on progress
Collections.sort(reduceList, Collections.sort(reduceList,
new Comparator<TaskAttemptId>() { new Comparator<TaskAttemptId>() {

View File

@ -31,9 +31,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -360,6 +363,16 @@ public class MRApp extends MRAppMaster {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234); NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
Container container = BuilderUtils.newContainer(cId, nodeId, Container container = BuilderUtils.newContainer(cId, nodeId,
"localhost:9999", null, null, null); "localhost:9999", null, null, null);
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
100)));
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP,
100)));
getContext().getEventHandler().handle( getContext().getEventHandler().handle(
new TaskAttemptContainerAssignedEvent(event.getAttemptID(), new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null)); container, null));

View File

@ -256,6 +256,7 @@
"TASK_FINISHED", "TASK_FINISHED",
"TASK_FAILED", "TASK_FAILED",
"TASK_UPDATED", "TASK_UPDATED",
"NORMALIZED_RESOURCE",
"MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_STARTED",
"MAP_ATTEMPT_FINISHED", "MAP_ATTEMPT_FINISHED",
"MAP_ATTEMPT_FAILED", "MAP_ATTEMPT_FAILED",

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;

View File

@ -18,14 +18,11 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException; import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.avro.util.Utf8;
/** /**
* Event to record Failed and Killed completion of jobs * Event to record Failed and Killed completion of jobs
* *

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskType;
/**
* Event to record the normalized map/reduce requirements.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NormalizedResourceEvent implements HistoryEvent {
private int memory;
private TaskType taskType;
/**
* Normalized request when sent to the Resource Manager.
* @param taskType the tasktype of the request.
* @param memory the normalized memory requirements.
*/
public NormalizedResourceEvent(TaskType taskType, int memory) {
this.memory = memory;
this.taskType = taskType;
}
/**
* the tasktype for the event.
* @return the tasktype for the event.
*/
public TaskType getTaskType() {
return this.taskType;
}
/**
* the normalized memory
* @return the normalized memory
*/
public int getMemory() {
return this.memory;
}
@Override
public EventType getEventType() {
return EventType.NORMALIZED_RESOURCE;
}
@Override
public Object getDatum() {
throw new UnsupportedOperationException("Not a seriable object");
}
@Override
public void setDatum(Object datum) {
throw new UnsupportedOperationException("Not a seriable object");
}
}

View File

@ -18,19 +18,15 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException; import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.avro.util.Utf8;
/** /**
* Event to record successful completion of a reduce attempt * Event to record successful completion of a reduce attempt
* *

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException; import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.avro.util.Utf8;
/** /**
* Event to record the start of a task * Event to record the start of a task
* *

View File

@ -1,20 +1,20 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.mapreduce.v2.hs; package org.apache.hadoop.mapreduce.v2.hs;
@ -54,26 +54,31 @@ import org.junit.Test;
public class TestJobHistoryParsing { public class TestJobHistoryParsing {
private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
@Test @Test
public void testHistoryParsing() throws Exception { public void testHistoryParsing() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
long amStartTimeEst = System.currentTimeMillis(); long amStartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true); MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
true);
app.submit(conf); app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next(); Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID(); JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED); app.waitForState(job, JobState.SUCCEEDED);
//make sure all events are flushed // make sure all events are flushed
app.waitForState(Service.STATE.STOPPED); app.waitForState(Service.STATE.STOPPED);
String jobhistoryDir = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf); String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory(); JobHistory jobHistory = new JobHistory();
jobHistory.init(conf); jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId).getJobIndexInfo(); JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
String jobhistoryFileName = FileNameIndexUtils.getDoneFileName(jobIndexInfo); .getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo);
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
FSDataInputStream in = null; FSDataInputStream in = null;
@ -90,20 +95,17 @@ public class TestJobHistoryParsing {
JobHistoryParser parser = new JobHistoryParser(in); JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse(); JobInfo jobInfo = parser.parse();
Assert.assertEquals ("Incorrect username ", Assert.assertEquals("Incorrect username ", "mapred", jobInfo.getUsername());
"mapred", jobInfo.getUsername()); Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
Assert.assertEquals("Incorrect jobName ", Assert.assertEquals("Incorrect queuename ", "default",
"test", jobInfo.getJobname()); jobInfo.getJobQueueName());
Assert.assertEquals("Incorrect queuename ", Assert
"default", jobInfo.getJobQueueName()); .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
Assert.assertEquals("incorrect conf path", Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps());
"test", jobInfo.getJobConfPath()); Assert.assertEquals("incorrect finishedReduces ", 1,
Assert.assertEquals("incorrect finishedMap ", jobInfo.getFinishedReduces());
2, jobInfo.getFinishedMaps()); Assert.assertEquals("incorrect uberized ", job.isUber(),
Assert.assertEquals("incorrect finishedReduces ", jobInfo.getUberized());
1, jobInfo.getFinishedReduces());
Assert.assertEquals("incorrect uberized ",
job.isUber(), jobInfo.getUberized());
int totalTasks = jobInfo.getAllTasks().size(); int totalTasks = jobInfo.getAllTasks().size();
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
@ -120,15 +122,15 @@ public class TestJobHistoryParsing {
&& amInfo.getStartTime() >= amStartTimeEst); && amInfo.getStartTime() >= amStartTimeEst);
ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1); ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
//Assert at taskAttempt level // Assert at taskAttempt level
for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) { for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) {
int taskAttemptCount = taskInfo.getAllTaskAttempts().size(); int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
Assert.assertEquals("total number of task attempts ", Assert
1, taskAttemptCount); .assertEquals("total number of task attempts ", 1, taskAttemptCount);
TaskAttemptInfo taInfo = TaskAttemptInfo taInfo = taskInfo.getAllTaskAttempts().values()
taskInfo.getAllTaskAttempts().values().iterator().next(); .iterator().next();
Assert.assertNotNull(taInfo.getContainerId()); Assert.assertNotNull(taInfo.getContainerId());
//Verify the wrong ctor is not being used. Remove after mrv1 is removed. // Verify the wrong ctor is not being used. Remove after mrv1 is removed.
Assert.assertFalse(taInfo.getContainerId().equals(fakeCid)); Assert.assertFalse(taInfo.getContainerId().equals(fakeCid));
} }
@ -138,8 +140,7 @@ public class TestJobHistoryParsing {
TypeConverter.fromYarn(task.getID())); TypeConverter.fromYarn(task.getID()));
Assert.assertNotNull("TaskInfo not found", taskInfo); Assert.assertNotNull("TaskInfo not found", taskInfo);
for (TaskAttempt taskAttempt : task.getAttempts().values()) { for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID()))); TypeConverter.fromYarn((taskAttempt.getID())));
Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo); Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
Assert.assertEquals("Incorrect shuffle port for task attempt", Assert.assertEquals("Incorrect shuffle port for task attempt",
@ -151,6 +152,8 @@ public class TestJobHistoryParsing {
.getIntermediateSummaryFileName(jobId); .getIntermediateSummaryFileName(jobId);
Path summaryFile = new Path(jobhistoryDir, summaryFileName); Path summaryFile = new Path(jobhistoryDir, summaryFileName);
String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile); String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
Assert.assertNotNull(jobSummaryString); Assert.assertNotNull(jobSummaryString);
Map<String, String> jobSummaryElements = new HashMap<String, String>(); Map<String, String> jobSummaryElements = new HashMap<String, String>();

View File

@ -17,16 +17,13 @@
*/ */
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import java.util.List; import junit.framework.TestCase;
import java.util.ArrayList;
import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import junit.framework.TestCase;
/** /**
* Test various jobhistory events * Test various jobhistory events
*/ */