MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI (Aleksey Gorshkov via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1468484 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2013-04-16 16:12:25 +00:00
parent 16d41cabf9
commit fd6956c3bd
5 changed files with 470 additions and 90 deletions

View File

@ -712,6 +712,9 @@ Release 0.23.8 - UNRELEASED
BUG FIXES BUG FIXES
MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI
(Aleksey Gorshkov via tgraves)
Release 0.23.7 - UNRELEASED Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.logaggregation.LogDumper; import org.apache.hadoop.yarn.logaggregation.LogDumper;
@ -310,6 +311,7 @@ public int run(String[] argv) throws Exception {
exitCode = 0; exitCode = 0;
} else if (displayTasks) { } else if (displayTasks) {
displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState); displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
exitCode = 0;
} else if(killTask) { } else if(killTask) {
TaskAttemptID taskID = TaskAttemptID.forName(taskid); TaskAttemptID taskID = TaskAttemptID.forName(taskid);
Job job = cluster.getJob(taskID.getJobID()); Job job = cluster.getJob(taskID.getJobID());
@ -607,6 +609,6 @@ public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
public static void main(String[] argv) throws Exception { public static void main(String[] argv) throws Exception {
int res = ToolRunner.run(new CLI(), argv); int res = ToolRunner.run(new CLI(), argv);
System.exit(res); ExitUtil.terminate(res);
} }
} }

View File

@ -117,6 +117,16 @@
</profiles> </profiles>
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
<exclude>src/test/resources/job_1329348432655_0001-10.jhist</exclude>
</excludes>
</configuration>
</plugin>
<plugin> <plugin>
<artifactId>maven-jar-plugin</artifactId> <artifactId>maven-jar-plugin</artifactId>
<executions> <executions>
@ -148,15 +158,6 @@
</additionalClasspathElements> </additionalClasspathElements>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
</excludes>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.mapreduce; package org.apache.hadoop.mapreduce;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
@ -30,31 +32,37 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterMapReduceTestCase; import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.tools.CLI; import org.apache.hadoop.mapreduce.tools.CLI;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.junit.Ignore; /**
import org.junit.Test; test CLI class. CLI class implemented the Tool interface.
@Ignore Here test that CLI sends correct command with options and parameters.
*/
public class TestMRJobClient extends ClusterMapReduceTestCase { public class TestMRJobClient extends ClusterMapReduceTestCase {
private static final Log LOG = LogFactory.getLog(TestMRJobClient.class); private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
private Job runJob(Configuration conf) throws Exception { private Job runJob(Configuration conf) throws Exception {
String input = "hello1\nhello2\nhello3\n"; String input = "hello1\nhello2\nhello3\n";
Job job = MapReduceTestUtil.createJob(conf, Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
getInputDir(), getOutputDir(), 1, 1, input); 1, 1, input);
job.setJobName("mr"); job.setJobName("mr");
job.setPriority(JobPriority.HIGH); job.setPriority(JobPriority.NORMAL);
job.waitForCompletion(true); job.waitForCompletion(true);
return job; return job;
} }
public static int runTool(Configuration conf, Tool tool, public static int runTool(Configuration conf, Tool tool, String[] args,
String[] args, OutputStream out) throws Exception { OutputStream out) throws Exception {
PrintStream oldOut = System.out; PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true); PrintStream newOut = new PrintStream(out, true);
try { try {
@ -65,47 +73,409 @@ public static int runTool(Configuration conf, Tool tool,
} }
} }
@Test private static class BadOutputFormat extends TextOutputFormat<Object, Object> {
@Override
public void checkOutputSpecs(JobContext job) throws IOException {
throw new IOException();
}
}
public void testJobSubmissionSpecsAndFiles() throws Exception {
Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1);
job.setOutputFormatClass(BadOutputFormat.class);
try {
job.submit();
fail("Should've thrown an exception while checking output specs.");
} catch (Exception e) {
assertTrue(e instanceof IOException);
}
Cluster cluster = new Cluster(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
job.getConfiguration());
Path submitJobDir = new Path(jobStagingArea, "JobId");
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
assertFalse("Shouldn't have created a job file if job specs failed.",
FileSystem.get(conf).exists(submitJobFile));
}
/**
* main test method
*/
public void testJobClient() throws Exception { public void testJobClient() throws Exception {
Configuration conf = createJobConf(); Configuration conf = createJobConf();
Job job = runJob(conf); Job job = runJob(conf);
String jobId = job.getJobID().toString(); String jobId = job.getJobID().toString();
testGetCounter(jobId, conf); // test jobs list
testJobList(jobId, conf); testJobList(jobId, conf);
// test job counter
testGetCounter(jobId, conf);
// status
testJobStatus(jobId, conf);
// test list of events
testJobEvents(jobId, conf);
// test job history
testJobHistory(conf);
// test tracker list
testListTrackers(conf);
// attempts list
testListAttemptIds(jobId, conf);
// black list
testListBlackList(conf);
// test method main and help screen
startStop();
// test a change job priority .
testChangingJobPriority(jobId, conf); testChangingJobPriority(jobId, conf);
// submit job from file
testSubmit(conf);
// kill a task
testKillTask(job, conf);
// fail a task
testfailTask(job, conf);
// kill job
testKillJob(jobId, conf);
} }
@Test /**
public void testGetCounter(String jobId, * test fail task
Configuration conf) throws Exception { */
private void testfailTask(Job job, Configuration conf) throws Exception {
CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1);
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
// TaskAttemptId is not set
int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
assertEquals("Exit code", -1, exitCode);
try {
runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
fail(" this task should field");
} catch (YarnRemoteException e) {
// task completed !
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}
}
/**
* test a kill task
*/
private void testKillTask(Job job, Configuration conf) throws Exception {
CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad parameters
int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
assertEquals("Exit code", -1, exitCode);
try {
runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
fail(" this task should be killed");
} catch (YarnRemoteException e) {
// task completed
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}
}
/**
* test a kill job
*/
private void testKillJob(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
// without jobId
int exitCode = runTool(conf, jc, new String[] { "-kill" }, out);
assertEquals("Exit code", -1, exitCode);
// good parameters
exitCode = runTool(conf, jc, new String[] { "-kill", jobId }, out);
assertEquals("Exit code", 0, exitCode);
String answer = new String(out.toByteArray(), "UTF-8");
assertTrue(answer.contains("Killed job " + jobId));
}
/**
* test submit task from file
*/
private void testSubmit(Configuration conf) throws Exception {
CLI jc = createJobClient();
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1, "ping");
job.setJobName("mr");
job.setPriority(JobPriority.NORMAL);
File fcon = File.createTempFile("config", ".xml");
job.getConfiguration().writeXml(new FileOutputStream(fcon));
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad parameters
int exitCode = runTool(conf, jc, new String[] { "-submit" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc,
new String[] { "-submit", "file://" + fcon.getAbsolutePath() }, out);
assertEquals("Exit code", 0, exitCode);
String answer = new String(out.toByteArray());
// in console was written
assertTrue(answer.contains("Created job "));
}
/**
* test start form console command without options
*/
private void startStop() {
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintStream error = System.err;
System.setErr(new PrintStream(data));
ExitUtil.disableSystemExit();
try {
CLI.main(new String[0]);
fail(" CLI.main should call System.exit");
} catch (ExitUtil.ExitException e) {
ExitUtil.resetFirstExitException();
assertEquals(-1, e.status);
} catch (Exception e) {
} finally {
System.setErr(error);
}
// in console should be written help text
String s = new String(data.toByteArray());
assertTrue(s.contains("-submit"));
assertTrue(s.contains("-status"));
assertTrue(s.contains("-kill"));
assertTrue(s.contains("-set-priority"));
assertTrue(s.contains("-events"));
assertTrue(s.contains("-history"));
assertTrue(s.contains("-list"));
assertTrue(s.contains("-list-active-trackers"));
assertTrue(s.contains("-list-blacklisted-trackers"));
assertTrue(s.contains("-list-attempt-ids"));
assertTrue(s.contains("-kill-task"));
assertTrue(s.contains("-fail-task"));
assertTrue(s.contains("-logs"));
}
/**
* black list
*/
private void testListBlackList(Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] {
"-list-blacklisted-trackers", "second in" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-blacklisted-trackers" },
out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(0, counter);
}
/**
* print AttemptIds list
*/
private void testListAttemptIds(String jobId, Configuration conf)
throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids", jobId,
"MAP", "completed" }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(1, counter);
}
/**
* print tracker list
*/
private void testListTrackers(Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-list-active-trackers",
"second parameter" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-active-trackers" }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(2, counter);
}
/**
* print job history from file
*/
private void testJobHistory(Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
File f = new File("src/test/resources/job_1329348432655_0001-10.jhist");
// bad command
int exitCode = runTool(conf, jc, new String[] { "-history", "pul",
"file://" + f.getAbsolutePath() }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-history", "all",
"file://" + f.getAbsolutePath() }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.startsWith("task_")) {
counter++;
}
}
assertEquals(23, counter);
}
/**
* print job events list
*/
private void testJobEvents(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-events" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-events", jobId, "0", "100" },
out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
String attemptId = ("attempt" + jobId.substring(3));
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(attemptId)) {
counter++;
}
}
assertEquals(2, counter);
}
/**
* print job status
*/
private void testJobStatus(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad options
int exitCode = runTool(conf, jc, new String[] { "-status" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-status", jobId }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (!line.contains("Job state:")) {
continue;
}
break;
}
assertNotNull(line);
assertTrue(line.contains("SUCCEEDED"));
}
/**
* print counters
*/
public void testGetCounter(String jobId, Configuration conf) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad command
int exitCode = runTool(conf, createJobClient(), int exitCode = runTool(conf, createJobClient(),
new String[] { "-counter", }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(),
new String[] { "-counter", jobId, new String[] { "-counter", jobId,
"org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" }, "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
out); out);
assertEquals("Exit code", 0, exitCode); assertEquals("Exit code", 0, exitCode);
assertEquals("Counter", "3", out.toString().trim()); assertEquals("Counter", "3", out.toString().trim());
} }
/**
* print a job list
*/
protected void testJobList(String jobId, Configuration conf) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad options
int exitCode = runTool(conf, createJobClient(), new String[] { "-list",
"alldata" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(),
// all jobs
new String[] { "-list", "all" }, out);
assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
String line;
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(jobId)) {
counter++;
}
}
assertEquals(1, counter);
out.reset();
// only submitted
exitCode = runTool(conf, createJobClient(), new String[] { "-list" }, out);
assertEquals("Exit code", 0, exitCode);
br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
out.toByteArray())));
counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(jobId)) {
counter++;
}
}
// all jobs submitted! no current
assertEquals(1, counter);
@Test
public void testJobList(String jobId,
Configuration conf) throws Exception {
verifyJobPriority(jobId, "HIGH", conf, createJobClient());
} }
protected void verifyJobPriority(String jobId, String priority, protected void verifyJobPriority(String jobId, String priority,
Configuration conf, CLI jc) throws Exception { Configuration conf, CLI jc) throws Exception {
PipedInputStream pis = new PipedInputStream(); PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis); PipedOutputStream pos = new PipedOutputStream(pis);
int exitCode = runTool(conf, jc, int exitCode = runTool(conf, jc, new String[] { "-list", "all" }, pos);
new String[] { "-list", "all" },
pos);
assertEquals("Exit code", 0, exitCode); assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new InputStreamReader(pis)); BufferedReader br = new BufferedReader(new InputStreamReader(pis));
String line = null; String line;
while ((line = br.readLine()) != null) { while ((line = br.readLine()) != null) {
LOG.info("line = " + line); LOG.info("line = " + line);
if (!line.startsWith(jobId)) { if (!line.contains(jobId)) {
continue; continue;
} }
assertTrue(line.contains(priority)); assertTrue(line.contains(priority));
@ -114,63 +484,16 @@ protected void verifyJobPriority(String jobId, String priority,
pis.close(); pis.close();
} }
@Test
public void testChangingJobPriority(String jobId, Configuration conf) public void testChangingJobPriority(String jobId, Configuration conf)
throws Exception { throws Exception {
int exitCode = runTool(conf, createJobClient(), int exitCode = runTool(conf, createJobClient(),
new String[] { "-set-priority", jobId, "VERY_LOW" }, new String[] { "-set-priority" }, new ByteArrayOutputStream());
new ByteArrayOutputStream()); assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
jobId, "VERY_LOW" }, new ByteArrayOutputStream());
assertEquals("Exit code", 0, exitCode); assertEquals("Exit code", 0, exitCode);
verifyJobPriority(jobId, "VERY_LOW", conf, createJobClient()); // because this method does not implemented still.
} verifyJobPriority(jobId, "NORMAL", conf, createJobClient());
@Test
public void testMissingProfileOutput() throws Exception {
Configuration conf = createJobConf();
final String input = "hello1\n";
// Set a job to be profiled with an empty agentlib parameter.
// This will fail to create profile.out files for tasks.
// This will succeed by skipping the HTTP fetch of the
// profiler output.
Job job = MapReduceTestUtil.createJob(conf,
getInputDir(), getOutputDir(), 1, 1, input);
job.setJobName("disable-profile-fetch");
job.setProfileEnabled(true);
job.setProfileParams("-agentlib:,verbose=n,file=%s");
job.setMaxMapAttempts(1);
job.setMaxReduceAttempts(1);
job.setJobSetupCleanupNeeded(false);
job.waitForCompletion(true);
// Run another job with an hprof agentlib param; verify
// that the HTTP fetch works here.
Job job2 = MapReduceTestUtil.createJob(conf,
getInputDir(), getOutputDir(), 1, 1, input);
job2.setJobName("enable-profile-fetch");
job2.setProfileEnabled(true);
job2.setProfileParams(
"-agentlib:hprof=cpu=samples,heap=sites,force=n,"
+ "thread=y,verbose=n,file=%s");
job2.setProfileTaskRange(true, "0-1");
job2.setProfileTaskRange(false, "");
job2.setMaxMapAttempts(1);
job2.setMaxReduceAttempts(1);
job2.setJobSetupCleanupNeeded(false);
job2.waitForCompletion(true);
// Find the first map task, verify that we got its profile output file.
TaskReport [] reports = job2.getTaskReports(TaskType.MAP);
assertTrue("No task reports found!", reports.length > 0);
TaskReport report = reports[0];
TaskID id = report.getTaskId();
assertTrue(TaskType.MAP == id.getTaskType());
System.out.println("Using task id: " + id);
TaskAttemptID attemptId = new TaskAttemptID(id, 0);
File profileOutFile = new File(attemptId.toString() + ".profile");
assertTrue("Couldn't find profiler output", profileOutFile.exists());
assertTrue("Couldn't remove profiler output", profileOutFile.delete());
} }
protected CLI createJobClient() throws IOException { protected CLI createJobClient() throws IOException {