Merge -c 1236588 from trunk to branch-0.23 to fix MAPREDUCE-3720. Changed bin/mapred job -list to not print job-specific information not available at RM.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1236589 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ef144fd7a0
commit
f1b833c8a3
|
@ -546,6 +546,9 @@ Release 0.23.1 - Unreleased
|
|||
MAPREDUCE-3735. Add distcp jar to the distribution (tar).
|
||||
(mahadev)
|
||||
|
||||
MAPREDUCE-3720. Changed bin/mapred job -list to not print job-specific
|
||||
information not available at RM. (vinodkv via acmurthy)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -455,10 +455,14 @@ public class Job extends JobContextImpl implements JobContext {
|
|||
public String toString() {
|
||||
ensureState(JobState.RUNNING);
|
||||
String reasonforFailure = " ";
|
||||
int numMaps = 0;
|
||||
int numReduces = 0;
|
||||
try {
|
||||
updateStatus();
|
||||
if (status.getState().equals(JobStatus.State.FAILED))
|
||||
reasonforFailure = getTaskFailureEventString();
|
||||
numMaps = getTaskReports(TaskType.MAP).length;
|
||||
numReduces = getTaskReports(TaskType.REDUCE).length;
|
||||
} catch (IOException e) {
|
||||
} catch (InterruptedException ie) {
|
||||
}
|
||||
|
@ -468,6 +472,8 @@ public class Job extends JobContextImpl implements JobContext {
|
|||
sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
|
||||
sb.append("\n");
|
||||
sb.append("Uber job : ").append(status.isUber()).append("\n");
|
||||
sb.append("Number of maps: ").append(numMaps);
|
||||
sb.append("Number of reduces: ").append(numReduces);
|
||||
sb.append("map() completion: ");
|
||||
sb.append(status.getMapProgress()).append("\n");
|
||||
sb.append("reduce() completion: ");
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.mapreduce.tools;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -25,6 +26,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -560,25 +562,28 @@ public class CLI extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void displayJobList(JobStatus[] jobs)
|
||||
throws IOException, InterruptedException {
|
||||
System.out.println("Total jobs:" + jobs.length);
|
||||
System.out.println("JobId\tState\tStartTime\t" +
|
||||
"UserName\tQueue\tPriority\tMaps\tReduces\tUsedContainers\t" +
|
||||
"RsvdContainers\tUsedMem\tRsvdMem\tNeededMem\tAM info");
|
||||
for (JobStatus job : jobs) {
|
||||
TaskReport[] mapReports =
|
||||
cluster.getJob(job.getJobID()).getTaskReports(TaskType.MAP);
|
||||
TaskReport[] reduceReports =
|
||||
cluster.getJob(job.getJobID()).getTaskReports(TaskType.REDUCE);
|
||||
displayJobList(jobs, new PrintWriter(System.out));
|
||||
}
|
||||
|
||||
System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%dM\t%dM\t%dM\t%s\n",
|
||||
@Private
|
||||
public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
|
||||
@Private
|
||||
public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%14d\t%14d\t%7dM\t%7sM\t%9dM\t%10s\n";
|
||||
|
||||
@Private
|
||||
public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
|
||||
writer.println("Total jobs:" + jobs.length);
|
||||
writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
|
||||
"Queue", "Priority", "UsedContainers",
|
||||
"RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
|
||||
for (JobStatus job : jobs) {
|
||||
writer.printf(dataPattern,
|
||||
job.getJobID().toString(), job.getState(), job.getStartTime(),
|
||||
job.getUsername(), job.getQueue(),
|
||||
job.getPriority().name(),
|
||||
mapReports.length,
|
||||
reduceReports.length,
|
||||
job.getNumUsedSlots(),
|
||||
job.getNumReservedSlots(),
|
||||
job.getUsedMem(),
|
||||
|
@ -586,6 +591,7 @@ public class CLI extends Configured implements Tool {
|
|||
job.getNeededMem(),
|
||||
job.getSchedulingInfo());
|
||||
}
|
||||
writer.flush();
|
||||
}
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
|
|
|
@ -22,19 +22,24 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import java.io.PrintWriter;
|
||||
|
||||
import org.apache.hadoop.mapreduce.Cluster;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.JobPriority;
|
||||
import org.apache.hadoop.mapreduce.JobStatus;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.TaskReport;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public class JobClientUnitTest {
|
||||
|
||||
public class TestJobClient extends JobClient {
|
||||
|
@ -48,7 +53,6 @@ public class JobClientUnitTest {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testMapTaskReportsWithNullJob() throws Exception {
|
||||
TestJobClient client = new TestJobClient(new JobConf());
|
||||
|
@ -64,7 +68,6 @@ public class JobClientUnitTest {
|
|||
verify(mockCluster).getJob(id);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testReduceTaskReportsWithNullJob() throws Exception {
|
||||
TestJobClient client = new TestJobClient(new JobConf());
|
||||
|
@ -80,7 +83,6 @@ public class JobClientUnitTest {
|
|||
verify(mockCluster).getJob(id);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testSetupTaskReportsWithNullJob() throws Exception {
|
||||
TestJobClient client = new TestJobClient(new JobConf());
|
||||
|
@ -96,7 +98,6 @@ public class JobClientUnitTest {
|
|||
verify(mockCluster).getJob(id);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testCleanupTaskReportsWithNullJob() throws Exception {
|
||||
TestJobClient client = new TestJobClient(new JobConf());
|
||||
|
@ -115,12 +116,15 @@ public class JobClientUnitTest {
|
|||
@Test
|
||||
public void testShowJob() throws Exception {
|
||||
TestJobClient client = new TestJobClient(new JobConf());
|
||||
JobID jobID = new JobID("test", 0);
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
JobID jobID = new JobID(String.valueOf(startTime), 12345);
|
||||
|
||||
JobStatus mockJobStatus = mock(JobStatus.class);
|
||||
when(mockJobStatus.getJobID()).thenReturn(jobID);
|
||||
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
|
||||
when(mockJobStatus.getStartTime()).thenReturn(0L);
|
||||
when(mockJobStatus.getStartTime()).thenReturn(startTime);
|
||||
when(mockJobStatus.getUsername()).thenReturn("mockuser");
|
||||
when(mockJobStatus.getQueue()).thenReturn("mockqueue");
|
||||
when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL);
|
||||
|
@ -132,18 +136,21 @@ public class JobClientUnitTest {
|
|||
when(mockJobStatus.getSchedulingInfo()).thenReturn("NA");
|
||||
|
||||
Job mockJob = mock(Job.class);
|
||||
when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(new TaskReport[0]);
|
||||
when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(
|
||||
new TaskReport[5]);
|
||||
|
||||
Cluster mockCluster = mock(Cluster.class);
|
||||
when(mockCluster.getJob(jobID)).thenReturn(mockJob);
|
||||
|
||||
client.setCluster(mockCluster);
|
||||
|
||||
|
||||
client.displayJobList(new JobStatus[] {mockJobStatus});
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
client.displayJobList(new JobStatus[] {mockJobStatus}, new PrintWriter(out));
|
||||
String commandLineOutput = out.toString();
|
||||
System.out.println(commandLineOutput);
|
||||
Assert.assertTrue(commandLineOutput.contains("Total jobs:1"));
|
||||
|
||||
verify(mockJobStatus, atLeastOnce()).getJobID();
|
||||
verify(mockJob, atLeastOnce()).getTaskReports(isA(TaskType.class));
|
||||
verify(mockCluster, atLeastOnce()).getJob(jobID);
|
||||
verify(mockJobStatus).getState();
|
||||
verify(mockJobStatus).getStartTime();
|
||||
verify(mockJobStatus).getUsername();
|
||||
|
@ -155,5 +162,9 @@ public class JobClientUnitTest {
|
|||
verify(mockJobStatus).getReservedMem();
|
||||
verify(mockJobStatus).getNeededMem();
|
||||
verify(mockJobStatus).getSchedulingInfo();
|
||||
|
||||
// This call should not go to each AM.
|
||||
verify(mockCluster, never()).getJob(jobID);
|
||||
verify(mockJob, never()).getTaskReports(isA(TaskType.class));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue