Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1225264 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-12-28 18:37:22 +00:00
commit d619dee4ed
23 changed files with 391 additions and 168 deletions

View File

@ -77,6 +77,10 @@ Trunk (unreleased changes)
HADOOP-7899. Generate proto java files as part of the build. (tucu) HADOOP-7899. Generate proto java files as part of the build. (tucu)
HADOOP-7574. Improve FSShell -stat, add user/group elements (XieXianshan via harsh)
HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead (XieXianshan via harsh)
BUGS BUGS
HADOOP-7851. Configuration.getClasses() never returns the default value. HADOOP-7851. Configuration.getClasses() never returns the default value.

View File

@ -260,11 +260,11 @@
<section> <section>
<title> getmerge </title> <title> getmerge </title>
<p> <p>
<code>Usage: hdfs dfs -getmerge &lt;src&gt; &lt;localdst&gt; [addnl]</code> <code>Usage: hdfs dfs -getmerge [-nl] &lt;src&gt; &lt;localdst&gt;</code>
</p> </p>
<p> <p>
Takes a source directory and a destination file as input and concatenates files in src into the destination local file. Takes a source directory and a destination file as input and concatenates files in src into the destination local file.
Optionally <code>addnl</code> can be set to enable adding a newline character at the end of each file. Optionally <code>-nl</code> flag can be set to enable adding a newline character at the end of each file during merge.
</p> </p>
</section> </section>

View File

@ -45,26 +45,22 @@ public static void registerCommands(CommandFactory factory) {
/** merge multiple files together */ /** merge multiple files together */
public static class Merge extends FsCommand { public static class Merge extends FsCommand {
public static final String NAME = "getmerge"; public static final String NAME = "getmerge";
public static final String USAGE = "<src> <localdst> [addnl]"; public static final String USAGE = "[-nl] <src> <localdst>";
public static final String DESCRIPTION = public static final String DESCRIPTION =
"Get all the files in the directories that\n" + "Get all the files in the directories that\n" +
"match the source file pattern and merge and sort them to only\n" + "match the source file pattern and merge and sort them to only\n" +
"one file on local fs. <src> is kept."; "one file on local fs. <src> is kept.\n" +
" -nl Add a newline character at the end of each file.";
protected PathData dst = null; protected PathData dst = null;
protected String delimiter = null; protected String delimiter = null;
@Override @Override
protected void processOptions(LinkedList<String> args) throws IOException { protected void processOptions(LinkedList<String> args) throws IOException {
CommandFormat cf = new CommandFormat(2, 3); CommandFormat cf = new CommandFormat(2, 3, "nl");
cf.parse(args); cf.parse(args);
// TODO: this really should be a -nl option delimiter = cf.getOpt("nl") ? "\n" : null;
if ((args.size() > 2) && Boolean.parseBoolean(args.removeLast())) {
delimiter = "\n";
} else {
delimiter = null;
}
dst = new PathData(new File(args.removeLast()), getConf()); dst = new PathData(new File(args.removeLast()), getConf());
} }

View File

@ -32,9 +32,11 @@
* Print statistics about path in specified format. * Print statistics about path in specified format.
* Format sequences: * Format sequences:
* %b: Size of file in blocks * %b: Size of file in blocks
* %g: Group name of owner
* %n: Filename * %n: Filename
* %o: Block size * %o: Block size
* %r: replication * %r: replication
* %u: User name of owner
* %y: UTC date as &quot;yyyy-MM-dd HH:mm:ss&quot; * %y: UTC date as &quot;yyyy-MM-dd HH:mm:ss&quot;
* %Y: Milliseconds since January 1, 1970 UTC * %Y: Milliseconds since January 1, 1970 UTC
*/ */
@ -50,8 +52,8 @@ public static void registerCommands(CommandFactory factory) {
public static final String USAGE = "[format] <path> ..."; public static final String USAGE = "[format] <path> ...";
public static final String DESCRIPTION = public static final String DESCRIPTION =
"Print statistics about the file/directory at <path>\n" + "Print statistics about the file/directory at <path>\n" +
"in the specified format. Format accepts filesize in blocks (%b), filename (%n),\n" + "in the specified format. Format accepts filesize in blocks (%b), group name of owner(%g),\n" +
"block size (%o), replication (%r), modification date (%y, %Y)\n"; "filename (%n), block size (%o), replication (%r), user name of owner(%u), modification date (%y, %Y)\n";
protected static final SimpleDateFormat timeFmt; protected static final SimpleDateFormat timeFmt;
static { static {
@ -92,6 +94,9 @@ protected void processPath(PathData item) throws IOException {
? "directory" ? "directory"
: (stat.isFile() ? "regular file" : "symlink")); : (stat.isFile() ? "regular file" : "symlink"));
break; break;
case 'g':
buf.append(stat.getGroup());
break;
case 'n': case 'n':
buf.append(item.path.getName()); buf.append(item.path.getName());
break; break;
@ -101,6 +106,9 @@ protected void processPath(PathData item) throws IOException {
case 'r': case 'r':
buf.append(stat.getReplication()); buf.append(stat.getReplication());
break; break;
case 'u':
buf.append(stat.getOwner());
break;
case 'y': case 'y':
buf.append(timeFmt.format(new Date(stat.getModificationTime()))); buf.append(timeFmt.format(new Date(stat.getModificationTime())));
break; break;

View File

@ -449,7 +449,7 @@
<comparators> <comparators>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^-getmerge &lt;src&gt; &lt;localdst&gt; \[addnl\]:( |\t)*Get all the files in the directories that( )*</expected-output> <expected-output>^-getmerge \[-nl\] &lt;src&gt; &lt;localdst&gt;:( |\t)*Get all the files in the directories that( )*</expected-output>
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
@ -459,6 +459,10 @@
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^( |\t)*one file on local fs. &lt;src&gt; is kept.( )*</expected-output> <expected-output>^( |\t)*one file on local fs. &lt;src&gt; is kept.( )*</expected-output>
</comparator> </comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^( |\t)*-nl Add a newline character at the end of each file.( )*</expected-output>
</comparator>
</comparators> </comparators>
</test> </test>
@ -606,11 +610,11 @@
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^( |\t)*in the specified format. Format accepts filesize in blocks \(%b\), filename \(%n\),( )*</expected-output> <expected-output>^( |\t)*in the specified format. Format accepts filesize in blocks \(%b\), group name of owner\(%g\),( )*</expected-output>
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^( |\t)*block size \(%o\), replication \(%r\), modification date \(%y, %Y\)( )*</expected-output> <expected-output>^( |\t)*filename \(%n\), block size \(%o\), replication \(%r\), user name of owner\(%u\), modification date \(%y, %Y\)( )*</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>

View File

@ -49,6 +49,8 @@ Trunk (unreleased changes)
Move the support for multiple protocols to lower layer so that Writable, Move the support for multiple protocols to lower layer so that Writable,
PB and Avro can all use it (Sanjay) PB and Avro can all use it (Sanjay)
MAPREDUCE-2944. Improve checking of input for JobClient.displayTasks() (XieXianshan via harsh)
BUG FIXES BUG FIXES
MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks. MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks.
(Devaraj K and Amar Kamat via amarrk) (Devaraj K and Amar Kamat via amarrk)
@ -175,6 +177,12 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
Vavilapalli via sseth) Vavilapalli via sseth)
MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM
immediately after downloading a resource instead of always waiting for a
second. (Siddarth Seth via vinodkv)
MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv)
BUG FIXES BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob

View File

@ -228,7 +228,7 @@ public void init(final Configuration conf) {
+ recoveryEnabled + " recoverySupportedByCommitter: " + recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " ApplicationAttemptID: " + recoverySupportedByCommitter + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId()); + appAttemptID.getAttemptId());
dispatcher = new AsyncDispatcher(); dispatcher = createDispatcher();
addIfService(dispatcher); addIfService(dispatcher);
} }
@ -291,6 +291,10 @@ public void init(final Configuration conf) {
super.init(conf); super.init(conf);
} // end of init() } // end of init()
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
private OutputCommitter createOutputCommitter(Configuration conf) { private OutputCommitter createOutputCommitter(Configuration conf) {
OutputCommitter committer = null; OutputCommitter committer = null;

View File

@ -53,6 +53,7 @@ public interface Job {
int getTotalReduces(); int getTotalReduces();
int getCompletedMaps(); int getCompletedMaps();
int getCompletedReduces(); int getCompletedReduces();
float getProgress();
boolean isUber(); boolean isUber();
String getUserName(); String getUserName();
String getQueueName(); String getQueueName();

View File

@ -128,6 +128,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final String username; private final String username;
private final OutputCommitter committer; private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs; private final Map<JobACL, AccessControlList> jobACLs;
private float setupWeight = 0.05f;
private float cleanupWeight = 0.05f;
private float mapWeight = 0.0f;
private float reduceWeight = 0.0f;
private final Set<TaskId> completedTasksFromPreviousRun; private final Set<TaskId> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos; private final List<AMInfo> amInfos;
private final Lock readLock; private final Lock readLock;
@ -147,7 +151,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final long appSubmitTime; private final long appSubmitTime;
private boolean lazyTasksCopyNeeded = false; private boolean lazyTasksCopyNeeded = false;
private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>(); volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = newCounters(); private Counters jobCounters = newCounters();
// FIXME: // FIXME:
// //
@ -353,6 +357,8 @@ JobEventType.JOB_KILL, new KillTasksTransition())
private long startTime; private long startTime;
private long finishTime; private long finishTime;
private float setupProgress; private float setupProgress;
private float mapProgress;
private float reduceProgress;
private float cleanupProgress; private float cleanupProgress;
private boolean isUber = false; private boolean isUber = false;
@ -587,30 +593,51 @@ public JobReport getReport() {
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} }
computeProgress();
return MRBuilderUtils.newJobReport(jobId, jobName, username, state, return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, appSubmitTime, startTime, finishTime, setupProgress,
computeProgress(mapTasks), computeProgress(reduceTasks), this.mapProgress, this.reduceProgress,
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
private float computeProgress(Set<TaskId> taskIds) { @Override
readLock.lock(); public float getProgress() {
this.readLock.lock();
try { try {
float progress = 0; computeProgress();
for (TaskId taskId : taskIds) { return (this.setupProgress * this.setupWeight + this.cleanupProgress
Task task = tasks.get(taskId); * this.cleanupWeight + this.mapProgress * this.mapWeight + this.reduceProgress
progress += task.getProgress(); * this.reduceWeight);
}
int taskIdsSize = taskIds.size();
if (taskIdsSize != 0) {
progress = progress/taskIdsSize;
}
return progress;
} finally { } finally {
readLock.unlock(); this.readLock.unlock();
}
}
private void computeProgress() {
this.readLock.lock();
try {
float mapProgress = 0f;
float reduceProgress = 0f;
for (Task task : this.tasks.values()) {
if (task.getType() == TaskType.MAP) {
mapProgress += task.getProgress();
} else {
reduceProgress += task.getProgress();
}
}
if (this.numMapTasks != 0) {
mapProgress = mapProgress / this.numMapTasks;
}
if (this.numReduceTasks != 0) {
reduceProgress = reduceProgress / this.numReduceTasks;
}
this.mapProgress = mapProgress;
this.reduceProgress = reduceProgress;
} finally {
this.readLock.unlock();
} }
} }
@ -731,7 +758,7 @@ protected FileSystem getFileSystem(Configuration conf) throws IOException {
static JobState checkJobCompleteSuccess(JobImpl job) { static JobState checkJobCompleteSuccess(JobImpl job) {
// check for Job success // check for Job success
if (job.completedTaskCount == job.getTasks().size()) { if (job.completedTaskCount == job.tasks.size()) {
try { try {
// Commit job & do cleanup // Commit job & do cleanup
job.getCommitter().commitJob(job.getJobContext()); job.getCommitter().commitJob(job.getJobContext());
@ -970,6 +997,12 @@ public JobState transition(JobImpl job, JobEvent event) {
if (job.numMapTasks == 0 && job.numReduceTasks == 0) { if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
} }
checkTaskLimits(); checkTaskLimits();

View File

@ -376,7 +376,7 @@ public float getProgress() {
try { try {
TaskAttempt bestAttempt = selectBestAttempt(); TaskAttempt bestAttempt = selectBestAttempt();
if (bestAttempt == null) { if (bestAttempt == null) {
return 0; return 0f;
} }
return bestAttempt.getProgress(); return bestAttempt.getProgress();
} finally { } finally {
@ -457,9 +457,10 @@ private TaskAttempt selectBestAttempt() {
result = at; //The first time around result = at; //The first time around
} }
// calculate the best progress // calculate the best progress
if (at.getProgress() > progress) { float attemptProgress = at.getProgress();
if (attemptProgress > progress) {
result = at; result = at;
progress = at.getProgress(); progress = attemptProgress;
} }
} }
return result; return result;

View File

@ -128,25 +128,7 @@ protected Job getJob() {
protected float getApplicationProgress() { protected float getApplicationProgress() {
// For now just a single job. In future when we have a DAG, we need an // For now just a single job. In future when we have a DAG, we need an
// aggregate progress. // aggregate progress.
JobReport report = this.job.getReport(); return this.job.getProgress();
float setupWeight = 0.05f;
float cleanupWeight = 0.05f;
float mapWeight = 0.0f;
float reduceWeight = 0.0f;
int numMaps = this.job.getTotalMaps();
int numReduces = this.job.getTotalReduces();
if (numMaps == 0 && numReduces == 0) {
} else if (numMaps == 0) {
reduceWeight = 0.9f;
} else if (numReduces == 0) {
mapWeight = 0.9f;
} else {
mapWeight = reduceWeight = 0.45f;
}
return (report.getSetupProgress() * setupWeight
+ report.getCleanupProgress() * cleanupWeight
+ report.getMapProgress() * mapWeight + report.getReduceProgress()
* reduceWeight);
} }
protected void register() { protected void register() {

View File

@ -161,7 +161,7 @@ protected AMResponse makeRemoteRequest() throws YarnRemoteException {
" finishedContainers=" + " finishedContainers=" +
response.getCompletedContainersStatuses().size() + response.getCompletedContainersStatuses().size() +
" resourcelimit=" + availableResources + " resourcelimit=" + availableResources +
"knownNMs=" + clusterNmCount); " knownNMs=" + clusterNmCount);
ask.clear(); ask.clear();
release.clear(); release.clear();

View File

@ -115,7 +115,8 @@ public class MRApp extends MRAppMaster {
applicationId.setId(0); applicationId.setId(0);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) { public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1); this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
} }
@ -143,8 +144,15 @@ private static ContainerId getContainerId(ApplicationId applicationId,
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
super(getApplicationAttemptId(applicationId, startCount), getContainerId( this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, System
.currentTimeMillis()); .currentTimeMillis());
this.testWorkDir = new File("target", testName); this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath()); testAbsPath = new Path(testWorkDir.getAbsolutePath());
@ -205,9 +213,9 @@ public void waitForState(Task task, TaskState finalState) throws Exception {
TaskReport report = task.getReport(); TaskReport report = task.getReport();
while (!finalState.equals(report.getTaskState()) && while (!finalState.equals(report.getTaskState()) &&
timeoutSecs++ < 20) { timeoutSecs++ < 20) {
System.out.println("Task State is : " + report.getTaskState() + System.out.println("Task State for " + task.getID() + " is : "
" Waiting for state : " + finalState + + report.getTaskState() + " Waiting for state : " + finalState
" progress : " + report.getProgress()); + " progress : " + report.getProgress());
report = task.getReport(); report = task.getReport();
Thread.sleep(500); Thread.sleep(500);
} }

View File

@ -425,6 +425,11 @@ public JobReport getReport() {
return report; return report;
} }
@Override
public float getProgress() {
return 0;
}
@Override @Override
public Counters getCounters() { public Counters getCounters() {
return counters; return counters;

View File

@ -26,6 +26,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -36,15 +37,20 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
@ -78,6 +84,7 @@
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("unchecked")
public class TestRMContainerAllocator { public class TestRMContainerAllocator {
static final Log LOG = LogFactory static final Log LOG = LogFactory
@ -338,98 +345,155 @@ protected ResourceScheduler createScheduler() {
} }
} }
private static class FakeJob extends JobImpl {
public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
int numMaps, int numReduces) {
super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0),
appAttemptID, conf, null, null, null, null, null, null, null, null,
true, null, System.currentTimeMillis(), null);
this.jobId = getID();
this.numMaps = numMaps;
this.numReduces = numReduces;
}
private float setupProgress;
private float mapProgress;
private float reduceProgress;
private float cleanupProgress;
private final int numMaps;
private final int numReduces;
private JobId jobId;
void setProgress(float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress) {
this.setupProgress = setupProgress;
this.mapProgress = mapProgress;
this.reduceProgress = reduceProgress;
this.cleanupProgress = cleanupProgress;
}
@Override
public int getTotalMaps() { return this.numMaps; }
@Override
public int getTotalReduces() { return this.numReduces;}
@Override
public JobReport getReport() {
return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
this.reduceProgress, this.cleanupProgress, "jobfile", null, false);
}
}
@Test @Test
public void testReportedAppProgress() throws Exception { public void testReportedAppProgress() throws Exception {
LOG.info("Running testReportedAppProgress"); LOG.info("Running testReportedAppProgress");
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf); final MyResourceManager rm = new MyResourceManager(conf);
rm.start(); rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher(); .getDispatcher();
// Submit the application // Submit the application
RMApp app = rm.submitApp(1024); RMApp rmApp = rm.submitApp(1024);
dispatcher.await(); rmDispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); MockNM amNodeManager = rm.registerNode("amNM:1234", 21504);
amNodeManager.nodeHeartbeat(true); amNodeManager.nodeHeartbeat(true);
dispatcher.await(); rmDispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
.getAppAttemptId(); .getAppAttemptId();
rm.sendAMLaunched(appAttemptId); rm.sendAMLaunched(appAttemptId);
dispatcher.await(); rmDispatcher.await();
FakeJob job = new FakeJob(appAttemptId, conf, 2, 2); MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
appAttemptId, job); @Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
return new MyContainerAllocator(rm, appAttemptId, context);
};
};
Assert.assertEquals(0.0, rmApp.getProgress(), 0.0);
mrApp.submit(conf);
Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
.getValue();
DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
MyContainerAllocator allocator = (MyContainerAllocator) mrApp
.getContainerAllocator();
mrApp.waitForState(job, JobState.RUNNING);
amDispatcher.await();
// Wait till all map-attempts request for containers
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.MAP) {
mrApp.waitForState(t.getAttempts().values().iterator().next(),
TaskAttemptState.UNASSIGNED);
}
}
amDispatcher.await();
allocator.schedule();
rmDispatcher.await();
amNodeManager.nodeHeartbeat(true);
rmDispatcher.await();
allocator.schedule();
rmDispatcher.await();
// Wait for all map-tasks to be running
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.MAP) {
mrApp.waitForState(t, TaskState.RUNNING);
}
}
allocator.schedule(); // Send heartbeat allocator.schedule(); // Send heartbeat
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(0.0, app.getProgress(), 0.0); Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 10, 0, 0); // Finish off 1 map.
Iterator<Task> it = job.getTasks().values().iterator();
finishNextNTasks(mrApp, it, 1);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(9.5f, app.getProgress(), 0.0); Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 80, 0, 0); // Finish off 7 more so that map-progress is 80%
finishNextNTasks(mrApp, it, 7);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(41.0f, app.getProgress(), 0.0); Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 100, 20, 0); // Finish off the 2 remaining maps
allocator.schedule(); finishNextNTasks(mrApp, it, 2);
dispatcher.await();
Assert.assertEquals(59.0f, app.getProgress(), 0.0); // Wait till all reduce-attempts request for containers
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.REDUCE) {
mrApp.waitForState(t.getAttempts().values().iterator().next(),
TaskAttemptState.UNASSIGNED);
}
}
job.setProgress(100, 100, 100, 100);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(100.0f, app.getProgress(), 0.0); amNodeManager.nodeHeartbeat(true);
rmDispatcher.await();
allocator.schedule();
rmDispatcher.await();
// Wait for all reduce-tasks to be running
for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.REDUCE) {
mrApp.waitForState(t, TaskState.RUNNING);
}
}
// Finish off 2 reduces
finishNextNTasks(mrApp, it, 2);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
// Finish off the remaining 8 reduces.
finishNextNTasks(mrApp, it, 8);
allocator.schedule();
rmDispatcher.await();
// Remaining is JobCleanup
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
}
private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int nextN)
throws Exception {
Task task;
for (int i=0; i<nextN; i++) {
task = it.next();
finishTask(mrApp, task);
}
}
private void finishTask(MRApp mrApp, Task task) throws Exception {
TaskAttempt attempt = task.getAttempts().values().iterator().next();
mrApp.getContext().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
mrApp.waitForState(task, TaskState.SUCCEEDED);
} }
@Test @Test
@ -438,46 +502,96 @@ public void testReportedAppProgressWithOnlyMaps() throws Exception {
LOG.info("Running testReportedAppProgressWithOnlyMaps"); LOG.info("Running testReportedAppProgressWithOnlyMaps");
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf); final MyResourceManager rm = new MyResourceManager(conf);
rm.start(); rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher(); .getDispatcher();
// Submit the application // Submit the application
RMApp app = rm.submitApp(1024); RMApp rmApp = rm.submitApp(1024);
dispatcher.await(); rmDispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
amNodeManager.nodeHeartbeat(true); amNodeManager.nodeHeartbeat(true);
dispatcher.await(); rmDispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
.getAppAttemptId(); .getAppAttemptId();
rm.sendAMLaunched(appAttemptId); rm.sendAMLaunched(appAttemptId);
dispatcher.await(); rmDispatcher.await();
FakeJob job = new FakeJob(appAttemptId, conf, 2, 0); MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
appAttemptId, job); @Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
return new MyContainerAllocator(rm, appAttemptId, context);
};
};
Assert.assertEquals(0.0, rmApp.getProgress(), 0.0);
mrApp.submit(conf);
Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
.getValue();
DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
MyContainerAllocator allocator = (MyContainerAllocator) mrApp
.getContainerAllocator();
mrApp.waitForState(job, JobState.RUNNING);
amDispatcher.await();
// Wait till all map-attempts request for containers
for (Task t : job.getTasks().values()) {
mrApp.waitForState(t.getAttempts().values().iterator().next(),
TaskAttemptState.UNASSIGNED);
}
amDispatcher.await();
allocator.schedule();
rmDispatcher.await();
amNodeManager.nodeHeartbeat(true);
rmDispatcher.await();
allocator.schedule();
rmDispatcher.await();
// Wait for all map-tasks to be running
for (Task t : job.getTasks().values()) {
mrApp.waitForState(t, TaskState.RUNNING);
}
allocator.schedule(); // Send heartbeat allocator.schedule(); // Send heartbeat
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(0.0, app.getProgress(), 0.0); Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 10, 0, 0); Iterator<Task> it = job.getTasks().values().iterator();
allocator.schedule();
dispatcher.await();
Assert.assertEquals(14f, app.getProgress(), 0.0);
job.setProgress(100, 60, 0, 0); // Finish off 1 map so that map-progress is 10%
finishNextNTasks(mrApp, it, 1);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(59.0f, app.getProgress(), 0.0); Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
job.setProgress(100, 100, 0, 100); // Finish off 5 more map so that map-progress is 60%
finishNextNTasks(mrApp, it, 5);
allocator.schedule(); allocator.schedule();
dispatcher.await(); rmDispatcher.await();
Assert.assertEquals(100.0f, app.getProgress(), 0.0); Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
// Finish off remaining map so that map-progress is 100%
finishNextNTasks(mrApp, it, 4);
allocator.schedule();
rmDispatcher.await();
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
} }
@Test @Test
@ -1000,7 +1114,6 @@ private static class MyContainerAllocator extends RMContainerAllocator {
private MyResourceManager rm; private MyResourceManager rm;
@SuppressWarnings("rawtypes")
private static AppContext createAppContext( private static AppContext createAppContext(
ApplicationAttemptId appAttemptId, Job job) { ApplicationAttemptId appAttemptId, Job job) {
AppContext context = mock(AppContext.class); AppContext context = mock(AppContext.class);
@ -1028,7 +1141,15 @@ private static ClientService createMockClientService() {
return service; return service;
} }
MyContainerAllocator(MyResourceManager rm, Configuration conf, // Use this constructor when using a real job.
MyContainerAllocator(MyResourceManager rm,
ApplicationAttemptId appAttemptId, AppContext context) {
super(createMockClientService(), context);
this.rm = rm;
}
// Use this constructor when you are using a mocked job.
public MyContainerAllocator(MyResourceManager rm, Configuration conf,
ApplicationAttemptId appAttemptId, Job job) { ApplicationAttemptId appAttemptId, Job job) {
super(createMockClientService(), createAppContext(appAttemptId, job)); super(createMockClientService(), createAppContext(appAttemptId, job));
this.rm = rm; this.rm = rm;
@ -1090,6 +1211,7 @@ public List<TaskAttemptContainerAssignedEvent> schedule() {
return result; return result;
} }
@Override
protected void startAllocatorThread() { protected void startAllocatorThread() {
// override to NOT start thread // override to NOT start thread
} }

View File

@ -393,6 +393,11 @@ public JobReport getReport() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public float getProgress() {
return 0;
}
@Override @Override
public Counters getCounters() { public Counters getCounters() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");

View File

@ -65,7 +65,7 @@ public void testJobNoTasksTransition() {
Task mockTask = mock(Task.class); Task mockTask = mock(Task.class);
Map<TaskId, Task> tasks = new HashMap<TaskId, Task>(); Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
tasks.put(mockTask.getID(), mockTask); tasks.put(mockTask.getID(), mockTask);
when(mockJob.getTasks()).thenReturn(tasks); mockJob.tasks = tasks;
when(mockJob.getState()).thenReturn(JobState.ERROR); when(mockJob.getState()).thenReturn(JobState.ERROR);
JobEvent mockJobEvent = mock(JobEvent.class); JobEvent mockJobEvent = mock(JobEvent.class);
@ -78,6 +78,7 @@ public void testJobNoTasksTransition() {
public void testCheckJobCompleteSuccess() { public void testCheckJobCompleteSuccess() {
JobImpl mockJob = mock(JobImpl.class); JobImpl mockJob = mock(JobImpl.class);
mockJob.tasks = new HashMap<TaskId, Task>();
OutputCommitter mockCommitter = mock(OutputCommitter.class); OutputCommitter mockCommitter = mock(OutputCommitter.class);
EventHandler mockEventHandler = mock(EventHandler.class); EventHandler mockEventHandler = mock(EventHandler.class);
JobContext mockJobContext = mock(JobContext.class); JobContext mockJobContext = mock(JobContext.class);
@ -110,7 +111,7 @@ public void testCheckJobCompleteSuccessFailed() {
Task mockTask = mock(Task.class); Task mockTask = mock(Task.class);
Map<TaskId, Task> tasks = new HashMap<TaskId, Task>(); Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
tasks.put(mockTask.getID(), mockTask); tasks.put(mockTask.getID(), mockTask);
when(mockJob.getTasks()).thenReturn(tasks); mockJob.tasks = tasks;
try { try {
// Just in case the code breaks and reaches these calls // Just in case the code breaks and reaches these calls

View File

@ -723,6 +723,8 @@ public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
* @param type the type of the task (map/reduce/setup/cleanup) * @param type the type of the task (map/reduce/setup/cleanup)
* @param state the state of the task * @param state the state of the task
* (pending/running/completed/failed/killed) * (pending/running/completed/failed/killed)
* @throws IOException when there is an error communicating with the master
* @throws IllegalArgumentException if an invalid type/state is passed
*/ */
public void displayTasks(final JobID jobId, String type, String state) public void displayTasks(final JobID jobId, String type, String state)
throws IOException { throws IOException {

View File

@ -20,6 +20,9 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.HashSet;
import java.util.Arrays;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -56,6 +59,10 @@
public class CLI extends Configured implements Tool { public class CLI extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(CLI.class); private static final Log LOG = LogFactory.getLog(CLI.class);
protected Cluster cluster; protected Cluster cluster;
private final Set<String> taskTypes = new HashSet<String>(
Arrays.asList("map", "reduce", "setup", "cleanup"));
private final Set<String> taskStates = new HashSet<String>(
Arrays.asList("pending", "running", "completed", "failed", "killed"));
public CLI() { public CLI() {
} }
@ -545,9 +552,21 @@ private void printTaskAttempts(TaskReport report) {
* @param type the type of the task (map/reduce/setup/cleanup) * @param type the type of the task (map/reduce/setup/cleanup)
* @param state the state of the task * @param state the state of the task
* (pending/running/completed/failed/killed) * (pending/running/completed/failed/killed)
* @throws IOException when there is an error communicating with the master
* @throws InterruptedException
* @throws IllegalArgumentException if an invalid type/state is passed
*/ */
protected void displayTasks(Job job, String type, String state) protected void displayTasks(Job job, String type, String state)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (!taskTypes.contains(type)) {
throw new IllegalArgumentException("Invalid type: " + type +
". Valid types for task are: map, reduce, setup, cleanup.");
}
if (!taskStates.contains(state)) {
throw new java.lang.IllegalArgumentException("Invalid state: " + state +
". Valid states for task are: pending, running, completed, failed, killed.");
}
TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type)); TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
for (TaskReport report : reports) { for (TaskReport report : reports) {
TIPStatus status = report.getCurrentStatus(); TIPStatus status = report.getCurrentStatus();

View File

@ -135,6 +135,11 @@ public JobReport getReport() {
return report; return report;
} }
@Override
public float getProgress() {
return 1.0f;
}
@Override @Override
public JobState getState() { public JobState getState() {
return report.getJobState(); return report.getJobState();

View File

@ -89,6 +89,11 @@ public JobReport getReport() {
return jobReport; return jobReport;
} }
@Override
public float getProgress() {
return 1.0f;
}
@Override @Override
public Counters getCounters() { public Counters getCounters() {
return null; return null;

View File

@ -31,7 +31,9 @@
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -163,7 +165,8 @@ public LocalizationProtocol run() {
ExecutorService exec = null; ExecutorService exec = null;
try { try {
exec = createDownloadThreadPool(); exec = createDownloadThreadPool();
localizeFiles(nodeManager, exec, ugi); CompletionService<Path> ecs = createCompletionService(exec);
localizeFiles(nodeManager, ecs, ugi);
return 0; return 0;
} catch (Throwable e) { } catch (Throwable e) {
// Print traces to stdout so that they can be logged by the NM address // Print traces to stdout so that they can be logged by the NM address
@ -182,6 +185,10 @@ ExecutorService createDownloadThreadPool() {
.setNameFormat("ContainerLocalizer Downloader").build()); .setNameFormat("ContainerLocalizer Downloader").build());
} }
CompletionService<Path> createCompletionService(ExecutorService exec) {
return new ExecutorCompletionService<Path>(exec);
}
Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc, Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
UserGroupInformation ugi) throws IOException { UserGroupInformation ugi) throws IOException {
Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf); Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
@ -206,7 +213,8 @@ void sleep(int duration) throws InterruptedException {
} }
private void localizeFiles(LocalizationProtocol nodemanager, private void localizeFiles(LocalizationProtocol nodemanager,
ExecutorService exec, UserGroupInformation ugi) throws IOException { CompletionService<Path> cs, UserGroupInformation ugi)
throws IOException {
while (true) { while (true) {
try { try {
LocalizerStatus status = createStatus(); LocalizerStatus status = createStatus();
@ -231,7 +239,7 @@ private void localizeFiles(LocalizationProtocol nodemanager,
break; break;
} }
// TODO: Synchronization?? // TODO: Synchronization??
pendingResources.put(r, exec.submit(download(lda, r, ugi))); pendingResources.put(r, cs.submit(download(lda, r, ugi)));
} }
} }
break; break;
@ -247,8 +255,7 @@ private void localizeFiles(LocalizationProtocol nodemanager,
} catch (YarnRemoteException e) { } } catch (YarnRemoteException e) { }
return; return;
} }
// TODO HB immediately when rsrc localized cs.poll(1000, TimeUnit.MILLISECONDS);
sleep(1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {

View File

@ -40,6 +40,7 @@
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -146,7 +147,8 @@ public void testContainerLocalizerMain() throws Exception {
// return result instantly for deterministic test // return result instantly for deterministic test
ExecutorService syncExec = mock(ExecutorService.class); ExecutorService syncExec = mock(ExecutorService.class);
when(syncExec.submit(isA(Callable.class))) CompletionService<Path> cs = mock(CompletionService.class);
when(cs.submit(isA(Callable.class)))
.thenAnswer(new Answer<Future<Path>>() { .thenAnswer(new Answer<Future<Path>>() {
@Override @Override
public Future<Path> answer(InvocationOnMock invoc) public Future<Path> answer(InvocationOnMock invoc)
@ -159,6 +161,7 @@ public Future<Path> answer(InvocationOnMock invoc)
} }
}); });
doReturn(syncExec).when(localizer).createDownloadThreadPool(); doReturn(syncExec).when(localizer).createDownloadThreadPool();
doReturn(cs).when(localizer).createCompletionService(syncExec);
// run localization // run localization
assertEquals(0, localizer.runLocalization(nmAddr)); assertEquals(0, localizer.runLocalization(nmAddr));