MAPREDUCE-5503. Fixed a test issue in TestMRJobClient. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1526362 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-09-26 03:00:28 +00:00
parent 42c3cd3d13
commit fb48b6cdc9
3 changed files with 61 additions and 34 deletions

View File

@ -219,6 +219,8 @@ Release 2.1.2 - UNRELEASED
MAPREDUCE-5505. Clients should be notified job finished only after job MAPREDUCE-5505. Clients should be notified job finished only after job
successfully unregistered (Zhijie Shen via bikas) successfully unregistered (Zhijie Shen via bikas)
MAPREDUCE-5503. Fixed a test issue in TestMRJobClient. (Jian He via vinodkv)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -79,7 +79,7 @@ public class TestJobClient extends TestMRJobClient {
Configuration conf = createJobConf(); Configuration conf = createJobConf();
String jobId = runJob(); String jobId = runJob();
testGetCounter(jobId, conf); testGetCounter(jobId, conf);
testJobList(jobId, conf); testAllJobList(jobId, conf);
testChangingJobPriority(jobId, conf); testChangingJobPriority(jobId, conf);
} }

View File

@ -29,6 +29,8 @@ import java.io.PipedInputStream;
import java.io.PipedOutputStream; import java.io.PipedOutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -60,6 +62,22 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
return job; return job;
} }
private Job runJobInBackGround(Configuration conf) throws Exception {
String input = "hello1\nhello2\nhello3\n";
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1, input);
job.setJobName("mr");
job.setPriority(JobPriority.NORMAL);
job.submit();
int i = 0;
while (i++ < 200 && job.getJobID() == null) {
LOG.info("waiting for jobId...");
Thread.sleep(100);
}
return job;
}
public static int runTool(Configuration conf, Tool tool, String[] args, public static int runTool(Configuration conf, Tool tool, String[] args,
OutputStream out) throws Exception { OutputStream out) throws Exception {
PrintStream oldOut = System.out; PrintStream oldOut = System.out;
@ -108,8 +126,10 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
Job job = runJob(conf); Job job = runJob(conf);
String jobId = job.getJobID().toString(); String jobId = job.getJobID().toString();
// test jobs list // test all jobs list
testJobList(jobId, conf); testAllJobList(jobId, conf);
// test only submitted jobs list
testSubmittedJobList(conf);
// test job counter // test job counter
testGetCounter(jobId, conf); testGetCounter(jobId, conf);
// status // status
@ -131,38 +151,37 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
// submit job from file // submit job from file
testSubmit(conf); testSubmit(conf);
// kill a task // kill a task
testKillTask(job, conf); testKillTask(conf);
// fail a task // fail a task
testfailTask(job, conf); testfailTask(conf);
// kill job // kill job
testKillJob(jobId, conf); testKillJob(conf);
} }
/** /**
* test fail task * test fail task
*/ */
private void testfailTask(Job job, Configuration conf) throws Exception { private void testfailTask(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
CLI jc = createJobClient(); CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0); TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1); TaskAttemptID taid = new TaskAttemptID(tid, 1);
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
// TaskAttemptId is not set // TaskAttemptId is not set
int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out); int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
assertEquals("Exit code", -1, exitCode); assertEquals("Exit code", -1, exitCode);
try { runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out); String answer = new String(out.toByteArray(), "UTF-8");
fail(" this task should field"); Assert
} catch (IOException e) { .assertTrue(answer.contains("Killed task " + taid + " by failing it"));
// task completed !
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}
} }
/** /**
* test a kill task * test a kill task
*/ */
private void testKillTask(Job job, Configuration conf) throws Exception { private void testKillTask(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
CLI jc = createJobClient(); CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0); TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1); TaskAttemptID taid = new TaskAttemptID(tid, 1);
@ -171,20 +190,17 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out); int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
assertEquals("Exit code", -1, exitCode); assertEquals("Exit code", -1, exitCode);
try { runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out); String answer = new String(out.toByteArray(), "UTF-8");
fail(" this task should be killed"); Assert.assertTrue(answer.contains("Killed task " + taid));
} catch (IOException e) {
System.out.println(e);
// task completed
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}
} }
/** /**
* test a kill job * test a kill job
*/ */
private void testKillJob(String jobId, Configuration conf) throws Exception { private void testKillJob(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
String jobId = job.getJobID().toString();
CLI jc = createJobClient(); CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -435,7 +451,8 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
/** /**
* print a job list * print a job list
*/ */
protected void testJobList(String jobId, Configuration conf) throws Exception { protected void testAllJobList(String jobId, Configuration conf)
throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad options // bad options
@ -458,23 +475,31 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
} }
assertEquals(1, counter); assertEquals(1, counter);
out.reset(); out.reset();
// only submitted }
exitCode = runTool(conf, createJobClient(), new String[] { "-list" }, out);
protected void testSubmittedJobList(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
ByteArrayOutputStream out = new ByteArrayOutputStream();
String line;
int counter = 0;
// only submitted
int exitCode =
runTool(conf, createJobClient(), new String[] { "-list" }, out);
assertEquals("Exit code", 0, exitCode); assertEquals("Exit code", 0, exitCode);
br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream( BufferedReader br =
out.toByteArray()))); new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
out.toByteArray())));
counter = 0; counter = 0;
while ((line = br.readLine()) != null) { while ((line = br.readLine()) != null) {
LOG.info("line = " + line); LOG.info("line = " + line);
if (line.contains(jobId)) { if (line.contains(job.getJobID().toString())) {
counter++; counter++;
} }
} }
// all jobs submitted! no current // all jobs submitted! no current
assertEquals(1, counter); assertEquals(1, counter);
} }
protected void verifyJobPriority(String jobId, String priority, protected void verifyJobPriority(String jobId, String priority,
Configuration conf, CLI jc) throws Exception { Configuration conf, CLI jc) throws Exception {
PipedInputStream pis = new PipedInputStream(); PipedInputStream pis = new PipedInputStream();