MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI (Aleksey Gorshkov via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1468483 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1822529e88
commit
9ea20fc537
|
@ -861,6 +861,9 @@ Release 0.23.8 - UNRELEASED
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI
|
||||||
|
(Aleksey Gorshkov via tgraves)
|
||||||
|
|
||||||
Release 0.23.7 - UNRELEASED
|
Release 0.23.7 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
|
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
|
||||||
import org.apache.hadoop.mapreduce.v2.LogParams;
|
import org.apache.hadoop.mapreduce.v2.LogParams;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.hadoop.yarn.logaggregation.LogDumper;
|
import org.apache.hadoop.yarn.logaggregation.LogDumper;
|
||||||
|
@ -64,8 +65,6 @@ import com.google.common.base.Charsets;
|
||||||
public class CLI extends Configured implements Tool {
|
public class CLI extends Configured implements Tool {
|
||||||
private static final Log LOG = LogFactory.getLog(CLI.class);
|
private static final Log LOG = LogFactory.getLog(CLI.class);
|
||||||
protected Cluster cluster;
|
protected Cluster cluster;
|
||||||
private final Set<String> taskTypes = new HashSet<String>(
|
|
||||||
Arrays.asList("map", "reduce", "setup", "cleanup"));
|
|
||||||
private final Set<String> taskStates = new HashSet<String>(
|
private final Set<String> taskStates = new HashSet<String>(
|
||||||
Arrays.asList("pending", "running", "completed", "failed", "killed"));
|
Arrays.asList("pending", "running", "completed", "failed", "killed"));
|
||||||
|
|
||||||
|
@ -317,6 +316,7 @@ public class CLI extends Configured implements Tool {
|
||||||
exitCode = 0;
|
exitCode = 0;
|
||||||
} else if (displayTasks) {
|
} else if (displayTasks) {
|
||||||
displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
|
displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
|
||||||
|
exitCode = 0;
|
||||||
} else if(killTask) {
|
} else if(killTask) {
|
||||||
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
||||||
Job job = cluster.getJob(taskID.getJobID());
|
Job job = cluster.getJob(taskID.getJobID());
|
||||||
|
@ -563,16 +563,18 @@ public class CLI extends Configured implements Tool {
|
||||||
*/
|
*/
|
||||||
protected void displayTasks(Job job, String type, String state)
|
protected void displayTasks(Job job, String type, String state)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
if (!taskTypes.contains(type)) {
|
|
||||||
throw new IllegalArgumentException("Invalid type: " + type +
|
|
||||||
". Valid types for task are: map, reduce, setup, cleanup.");
|
|
||||||
}
|
|
||||||
if (!taskStates.contains(state)) {
|
if (!taskStates.contains(state)) {
|
||||||
throw new java.lang.IllegalArgumentException("Invalid state: " + state +
|
throw new java.lang.IllegalArgumentException("Invalid state: " + state +
|
||||||
". Valid states for task are: pending, running, completed, failed, killed.");
|
". Valid states for task are: pending, running, completed, failed, killed.");
|
||||||
}
|
}
|
||||||
|
TaskReport[] reports=null;
|
||||||
TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
|
try{
|
||||||
|
reports = job.getTaskReports(TaskType.valueOf(type));
|
||||||
|
}catch(IllegalArgumentException e){
|
||||||
|
throw new IllegalArgumentException("Invalid type: " + type +
|
||||||
|
". Valid types for task are: MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP.");
|
||||||
|
}
|
||||||
for (TaskReport report : reports) {
|
for (TaskReport report : reports) {
|
||||||
TIPStatus status = report.getCurrentStatus();
|
TIPStatus status = report.getCurrentStatus();
|
||||||
if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
|
if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
|
||||||
|
@ -626,6 +628,6 @@ public class CLI extends Configured implements Tool {
|
||||||
|
|
||||||
public static void main(String[] argv) throws Exception {
|
public static void main(String[] argv) throws Exception {
|
||||||
int res = ToolRunner.run(new CLI(), argv);
|
int res = ToolRunner.run(new CLI(), argv);
|
||||||
System.exit(res);
|
ExitUtil.terminate(res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,6 +154,7 @@
|
||||||
<configuration>
|
<configuration>
|
||||||
<excludes>
|
<excludes>
|
||||||
<exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
|
<exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
|
||||||
|
<exclude>src/test/resources/job_1329348432655_0001-10.jhist</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
|
@ -18,8 +18,10 @@
|
||||||
package org.apache.hadoop.mapreduce;
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -30,35 +32,37 @@ import java.io.PrintStream;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
|
import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||||
import org.apache.hadoop.mapreduce.tools.CLI;
|
import org.apache.hadoop.mapreduce.tools.CLI;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
|
||||||
import org.junit.Ignore;
|
/**
|
||||||
import org.junit.Test;
|
test CLI class. CLI class implemented the Tool interface.
|
||||||
@Ignore
|
Here test that CLI sends correct command with options and parameters.
|
||||||
|
*/
|
||||||
public class TestMRJobClient extends ClusterMapReduceTestCase {
|
public class TestMRJobClient extends ClusterMapReduceTestCase {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
|
private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
|
||||||
|
|
||||||
private Job runJob(Configuration conf) throws Exception {
|
private Job runJob(Configuration conf) throws Exception {
|
||||||
String input = "hello1\nhello2\nhello3\n";
|
String input = "hello1\nhello2\nhello3\n";
|
||||||
|
|
||||||
Job job = MapReduceTestUtil.createJob(conf,
|
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
|
||||||
getInputDir(), getOutputDir(), 1, 1, input);
|
1, 1, input);
|
||||||
job.setJobName("mr");
|
job.setJobName("mr");
|
||||||
job.setPriority(JobPriority.HIGH);
|
job.setPriority(JobPriority.NORMAL);
|
||||||
job.waitForCompletion(true);
|
job.waitForCompletion(true);
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int runTool(Configuration conf, Tool tool,
|
public static int runTool(Configuration conf, Tool tool, String[] args,
|
||||||
String[] args, OutputStream out) throws Exception {
|
OutputStream out) throws Exception {
|
||||||
PrintStream oldOut = System.out;
|
PrintStream oldOut = System.out;
|
||||||
PrintStream newOut = new PrintStream(out, true);
|
PrintStream newOut = new PrintStream(out, true);
|
||||||
try {
|
try {
|
||||||
|
@ -69,20 +73,17 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class BadOutputFormat
|
private static class BadOutputFormat extends TextOutputFormat<Object, Object> {
|
||||||
extends TextOutputFormat {
|
|
||||||
@Override
|
@Override
|
||||||
public void checkOutputSpecs(JobContext job)
|
public void checkOutputSpecs(JobContext job) throws IOException {
|
||||||
throws FileAlreadyExistsException, IOException {
|
|
||||||
throw new IOException();
|
throw new IOException();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testJobSubmissionSpecsAndFiles() throws Exception {
|
public void testJobSubmissionSpecsAndFiles() throws Exception {
|
||||||
Configuration conf = createJobConf();
|
Configuration conf = createJobConf();
|
||||||
Job job = MapReduceTestUtil.createJob(conf,
|
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
|
||||||
getInputDir(), getOutputDir(), 1, 1);
|
1, 1);
|
||||||
job.setOutputFormatClass(BadOutputFormat.class);
|
job.setOutputFormatClass(BadOutputFormat.class);
|
||||||
try {
|
try {
|
||||||
job.submit();
|
job.submit();
|
||||||
|
@ -90,60 +91,392 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assertTrue(e instanceof IOException);
|
assertTrue(e instanceof IOException);
|
||||||
}
|
}
|
||||||
JobID jobId = job.getJobID();
|
|
||||||
Cluster cluster = new Cluster(conf);
|
Cluster cluster = new Cluster(conf);
|
||||||
Path jobStagingArea = JobSubmissionFiles.getStagingDir(
|
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
|
||||||
cluster,
|
|
||||||
job.getConfiguration());
|
job.getConfiguration());
|
||||||
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
|
Path submitJobDir = new Path(jobStagingArea, "JobId");
|
||||||
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
|
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
|
||||||
assertFalse(
|
assertFalse("Shouldn't have created a job file if job specs failed.",
|
||||||
"Shouldn't have created a job file if job specs failed.",
|
FileSystem.get(conf).exists(submitJobFile));
|
||||||
FileSystem.get(conf).exists(submitJobFile)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
/**
|
||||||
|
* main test method
|
||||||
|
*/
|
||||||
|
|
||||||
public void testJobClient() throws Exception {
|
public void testJobClient() throws Exception {
|
||||||
Configuration conf = createJobConf();
|
Configuration conf = createJobConf();
|
||||||
Job job = runJob(conf);
|
Job job = runJob(conf);
|
||||||
|
|
||||||
String jobId = job.getJobID().toString();
|
String jobId = job.getJobID().toString();
|
||||||
testGetCounter(jobId, conf);
|
// test jobs list
|
||||||
testJobList(jobId, conf);
|
testJobList(jobId, conf);
|
||||||
|
// test job counter
|
||||||
|
testGetCounter(jobId, conf);
|
||||||
|
// status
|
||||||
|
testJobStatus(jobId, conf);
|
||||||
|
// test list of events
|
||||||
|
testJobEvents(jobId, conf);
|
||||||
|
// test job history
|
||||||
|
testJobHistory(conf);
|
||||||
|
// test tracker list
|
||||||
|
testListTrackers(conf);
|
||||||
|
// attempts list
|
||||||
|
testListAttemptIds(jobId, conf);
|
||||||
|
// black list
|
||||||
|
testListBlackList(conf);
|
||||||
|
// test method main and help screen
|
||||||
|
startStop();
|
||||||
|
// test a change job priority .
|
||||||
testChangingJobPriority(jobId, conf);
|
testChangingJobPriority(jobId, conf);
|
||||||
|
// submit job from file
|
||||||
|
testSubmit(conf);
|
||||||
|
// kill a task
|
||||||
|
testKillTask(job, conf);
|
||||||
|
// fail a task
|
||||||
|
testfailTask(job, conf);
|
||||||
|
// kill job
|
||||||
|
testKillJob(jobId, conf);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
/**
|
||||||
public void testGetCounter(String jobId,
|
* test fail task
|
||||||
Configuration conf) throws Exception {
|
*/
|
||||||
|
private void testfailTask(Job job, Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
|
||||||
|
TaskAttemptID taid = new TaskAttemptID(tid, 1);
|
||||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
// TaskAttemptId is not set
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
|
||||||
|
try {
|
||||||
|
runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
|
||||||
|
fail(" this task should field");
|
||||||
|
} catch (YarnRemoteException e) {
|
||||||
|
// task completed !
|
||||||
|
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* test a kill task
|
||||||
|
*/
|
||||||
|
private void testKillTask(Job job, Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
|
||||||
|
TaskAttemptID taid = new TaskAttemptID(tid, 1);
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
// bad parameters
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
|
||||||
|
try {
|
||||||
|
runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
|
||||||
|
fail(" this task should be killed");
|
||||||
|
} catch (YarnRemoteException e) {
|
||||||
|
// task completed
|
||||||
|
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test a kill job
|
||||||
|
*/
|
||||||
|
private void testKillJob(String jobId, Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
// without jobId
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-kill" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
// good parameters
|
||||||
|
exitCode = runTool(conf, jc, new String[] { "-kill", jobId }, out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
|
||||||
|
String answer = new String(out.toByteArray(), "UTF-8");
|
||||||
|
assertTrue(answer.contains("Killed job " + jobId));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test submit task from file
|
||||||
|
*/
|
||||||
|
private void testSubmit(Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
|
||||||
|
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
|
||||||
|
1, 1, "ping");
|
||||||
|
job.setJobName("mr");
|
||||||
|
job.setPriority(JobPriority.NORMAL);
|
||||||
|
|
||||||
|
File fcon = File.createTempFile("config", ".xml");
|
||||||
|
|
||||||
|
job.getConfiguration().writeXml(new FileOutputStream(fcon));
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
// bad parameters
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-submit" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
|
||||||
|
|
||||||
|
exitCode = runTool(conf, jc,
|
||||||
|
new String[] { "-submit", "file://" + fcon.getAbsolutePath() }, out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
String answer = new String(out.toByteArray());
|
||||||
|
// in console was written
|
||||||
|
assertTrue(answer.contains("Created job "));
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* test start form console command without options
|
||||||
|
*/
|
||||||
|
private void startStop() {
|
||||||
|
ByteArrayOutputStream data = new ByteArrayOutputStream();
|
||||||
|
PrintStream error = System.err;
|
||||||
|
System.setErr(new PrintStream(data));
|
||||||
|
ExitUtil.disableSystemExit();
|
||||||
|
try {
|
||||||
|
CLI.main(new String[0]);
|
||||||
|
fail(" CLI.main should call System.exit");
|
||||||
|
|
||||||
|
} catch (ExitUtil.ExitException e) {
|
||||||
|
ExitUtil.resetFirstExitException();
|
||||||
|
assertEquals(-1, e.status);
|
||||||
|
} catch (Exception e) {
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
System.setErr(error);
|
||||||
|
}
|
||||||
|
// in console should be written help text
|
||||||
|
String s = new String(data.toByteArray());
|
||||||
|
assertTrue(s.contains("-submit"));
|
||||||
|
assertTrue(s.contains("-status"));
|
||||||
|
assertTrue(s.contains("-kill"));
|
||||||
|
assertTrue(s.contains("-set-priority"));
|
||||||
|
assertTrue(s.contains("-events"));
|
||||||
|
assertTrue(s.contains("-history"));
|
||||||
|
assertTrue(s.contains("-list"));
|
||||||
|
assertTrue(s.contains("-list-active-trackers"));
|
||||||
|
assertTrue(s.contains("-list-blacklisted-trackers"));
|
||||||
|
assertTrue(s.contains("-list-attempt-ids"));
|
||||||
|
assertTrue(s.contains("-kill-task"));
|
||||||
|
assertTrue(s.contains("-fail-task"));
|
||||||
|
assertTrue(s.contains("-logs"));
|
||||||
|
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* black list
|
||||||
|
*/
|
||||||
|
private void testListBlackList(Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
int exitCode = runTool(conf, jc, new String[] {
|
||||||
|
"-list-blacklisted-trackers", "second in" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
exitCode = runTool(conf, jc, new String[] { "-list-blacklisted-trackers" },
|
||||||
|
out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
String line;
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||||
|
new ByteArrayInputStream(out.toByteArray())));
|
||||||
|
int counter = 0;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
LOG.info("line = " + line);
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
assertEquals(0, counter);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* print AttemptIds list
|
||||||
|
*/
|
||||||
|
private void testListAttemptIds(String jobId, Configuration conf)
|
||||||
|
throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids", jobId,
|
||||||
|
"MAP", "completed" }, out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
String line;
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||||
|
new ByteArrayInputStream(out.toByteArray())));
|
||||||
|
int counter = 0;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
LOG.info("line = " + line);
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
assertEquals(1, counter);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* print tracker list
|
||||||
|
*/
|
||||||
|
private void testListTrackers(Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-list-active-trackers",
|
||||||
|
"second parameter" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
exitCode = runTool(conf, jc, new String[] { "-list-active-trackers" }, out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
String line;
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||||
|
new ByteArrayInputStream(out.toByteArray())));
|
||||||
|
int counter = 0;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
LOG.info("line = " + line);
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
assertEquals(2, counter);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* print job history from file
|
||||||
|
*/
|
||||||
|
private void testJobHistory(Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
File f = new File("src/test/resources/job_1329348432655_0001-10.jhist");
|
||||||
|
// bad command
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-history", "pul",
|
||||||
|
"file://" + f.getAbsolutePath() }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
|
||||||
|
exitCode = runTool(conf, jc, new String[] { "-history", "all",
|
||||||
|
"file://" + f.getAbsolutePath() }, out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
String line;
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||||
|
new ByteArrayInputStream(out.toByteArray())));
|
||||||
|
int counter = 0;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
LOG.info("line = " + line);
|
||||||
|
if (line.startsWith("task_")) {
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(23, counter);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* print job events list
|
||||||
|
*/
|
||||||
|
private void testJobEvents(String jobId, Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-events" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
|
||||||
|
exitCode = runTool(conf, jc, new String[] { "-events", jobId, "0", "100" },
|
||||||
|
out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
String line;
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||||
|
new ByteArrayInputStream(out.toByteArray())));
|
||||||
|
int counter = 0;
|
||||||
|
String attemptId = ("attempt" + jobId.substring(3));
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
LOG.info("line = " + line);
|
||||||
|
if (line.contains(attemptId)) {
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(2, counter);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* print job status
|
||||||
|
*/
|
||||||
|
private void testJobStatus(String jobId, Configuration conf) throws Exception {
|
||||||
|
CLI jc = createJobClient();
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
// bad options
|
||||||
|
int exitCode = runTool(conf, jc, new String[] { "-status" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
|
||||||
|
exitCode = runTool(conf, jc, new String[] { "-status", jobId }, out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
String line;
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||||
|
new ByteArrayInputStream(out.toByteArray())));
|
||||||
|
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
LOG.info("line = " + line);
|
||||||
|
if (!line.contains("Job state:")) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assertNotNull(line);
|
||||||
|
assertTrue(line.contains("SUCCEEDED"));
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* print counters
|
||||||
|
*/
|
||||||
|
public void testGetCounter(String jobId, Configuration conf) throws Exception {
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
// bad command
|
||||||
int exitCode = runTool(conf, createJobClient(),
|
int exitCode = runTool(conf, createJobClient(),
|
||||||
|
new String[] { "-counter", }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
|
||||||
|
exitCode = runTool(conf, createJobClient(),
|
||||||
new String[] { "-counter", jobId,
|
new String[] { "-counter", jobId,
|
||||||
"org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
|
"org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
|
||||||
out);
|
out);
|
||||||
assertEquals("Exit code", 0, exitCode);
|
assertEquals("Exit code", 0, exitCode);
|
||||||
assertEquals("Counter", "3", out.toString().trim());
|
assertEquals("Counter", "3", out.toString().trim());
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* print a job list
|
||||||
|
*/
|
||||||
|
protected void testJobList(String jobId, Configuration conf) throws Exception {
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
// bad options
|
||||||
|
|
||||||
|
int exitCode = runTool(conf, createJobClient(), new String[] { "-list",
|
||||||
|
"alldata" }, out);
|
||||||
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
exitCode = runTool(conf, createJobClient(),
|
||||||
|
// all jobs
|
||||||
|
new String[] { "-list", "all" }, out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(
|
||||||
|
new ByteArrayInputStream(out.toByteArray())));
|
||||||
|
String line;
|
||||||
|
int counter = 0;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
LOG.info("line = " + line);
|
||||||
|
if (line.contains(jobId)) {
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(1, counter);
|
||||||
|
out.reset();
|
||||||
|
// only submitted
|
||||||
|
exitCode = runTool(conf, createJobClient(), new String[] { "-list" }, out);
|
||||||
|
assertEquals("Exit code", 0, exitCode);
|
||||||
|
br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
|
||||||
|
out.toByteArray())));
|
||||||
|
counter = 0;
|
||||||
|
while ((line = br.readLine()) != null) {
|
||||||
|
LOG.info("line = " + line);
|
||||||
|
if (line.contains(jobId)) {
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// all jobs submitted! no current
|
||||||
|
assertEquals(1, counter);
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testJobList(String jobId,
|
|
||||||
Configuration conf) throws Exception {
|
|
||||||
verifyJobPriority(jobId, "HIGH", conf, createJobClient());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void verifyJobPriority(String jobId, String priority,
|
protected void verifyJobPriority(String jobId, String priority,
|
||||||
Configuration conf, CLI jc) throws Exception {
|
Configuration conf, CLI jc) throws Exception {
|
||||||
PipedInputStream pis = new PipedInputStream();
|
PipedInputStream pis = new PipedInputStream();
|
||||||
PipedOutputStream pos = new PipedOutputStream(pis);
|
PipedOutputStream pos = new PipedOutputStream(pis);
|
||||||
int exitCode = runTool(conf, jc,
|
int exitCode = runTool(conf, jc, new String[] { "-list", "all" }, pos);
|
||||||
new String[] { "-list", "all" },
|
|
||||||
pos);
|
|
||||||
assertEquals("Exit code", 0, exitCode);
|
assertEquals("Exit code", 0, exitCode);
|
||||||
BufferedReader br = new BufferedReader(new InputStreamReader(pis));
|
BufferedReader br = new BufferedReader(new InputStreamReader(pis));
|
||||||
String line = null;
|
String line;
|
||||||
while ((line = br.readLine()) != null) {
|
while ((line = br.readLine()) != null) {
|
||||||
LOG.info("line = " + line);
|
LOG.info("line = " + line);
|
||||||
if (!line.startsWith(jobId)) {
|
if (!line.contains(jobId)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
assertTrue(line.contains(priority));
|
assertTrue(line.contains(priority));
|
||||||
|
@ -152,63 +485,16 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
||||||
pis.close();
|
pis.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testChangingJobPriority(String jobId, Configuration conf)
|
public void testChangingJobPriority(String jobId, Configuration conf)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
int exitCode = runTool(conf, createJobClient(),
|
int exitCode = runTool(conf, createJobClient(),
|
||||||
new String[] { "-set-priority", jobId, "VERY_LOW" },
|
new String[] { "-set-priority" }, new ByteArrayOutputStream());
|
||||||
new ByteArrayOutputStream());
|
assertEquals("Exit code", -1, exitCode);
|
||||||
|
exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
|
||||||
|
jobId, "VERY_LOW" }, new ByteArrayOutputStream());
|
||||||
assertEquals("Exit code", 0, exitCode);
|
assertEquals("Exit code", 0, exitCode);
|
||||||
verifyJobPriority(jobId, "VERY_LOW", conf, createJobClient());
|
// because this method does not implemented still.
|
||||||
}
|
verifyJobPriority(jobId, "NORMAL", conf, createJobClient());
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMissingProfileOutput() throws Exception {
|
|
||||||
Configuration conf = createJobConf();
|
|
||||||
final String input = "hello1\n";
|
|
||||||
|
|
||||||
// Set a job to be profiled with an empty agentlib parameter.
|
|
||||||
// This will fail to create profile.out files for tasks.
|
|
||||||
// This will succeed by skipping the HTTP fetch of the
|
|
||||||
// profiler output.
|
|
||||||
Job job = MapReduceTestUtil.createJob(conf,
|
|
||||||
getInputDir(), getOutputDir(), 1, 1, input);
|
|
||||||
job.setJobName("disable-profile-fetch");
|
|
||||||
job.setProfileEnabled(true);
|
|
||||||
job.setProfileParams("-agentlib:,verbose=n,file=%s");
|
|
||||||
job.setMaxMapAttempts(1);
|
|
||||||
job.setMaxReduceAttempts(1);
|
|
||||||
job.setJobSetupCleanupNeeded(false);
|
|
||||||
job.waitForCompletion(true);
|
|
||||||
|
|
||||||
// Run another job with an hprof agentlib param; verify
|
|
||||||
// that the HTTP fetch works here.
|
|
||||||
Job job2 = MapReduceTestUtil.createJob(conf,
|
|
||||||
getInputDir(), getOutputDir(), 1, 1, input);
|
|
||||||
job2.setJobName("enable-profile-fetch");
|
|
||||||
job2.setProfileEnabled(true);
|
|
||||||
job2.setProfileParams(
|
|
||||||
"-agentlib:hprof=cpu=samples,heap=sites,force=n,"
|
|
||||||
+ "thread=y,verbose=n,file=%s");
|
|
||||||
job2.setProfileTaskRange(true, "0-1");
|
|
||||||
job2.setProfileTaskRange(false, "");
|
|
||||||
job2.setMaxMapAttempts(1);
|
|
||||||
job2.setMaxReduceAttempts(1);
|
|
||||||
job2.setJobSetupCleanupNeeded(false);
|
|
||||||
job2.waitForCompletion(true);
|
|
||||||
|
|
||||||
// Find the first map task, verify that we got its profile output file.
|
|
||||||
TaskReport [] reports = job2.getTaskReports(TaskType.MAP);
|
|
||||||
assertTrue("No task reports found!", reports.length > 0);
|
|
||||||
TaskReport report = reports[0];
|
|
||||||
TaskID id = report.getTaskId();
|
|
||||||
assertTrue(TaskType.MAP == id.getTaskType());
|
|
||||||
System.out.println("Using task id: " + id);
|
|
||||||
TaskAttemptID attemptId = new TaskAttemptID(id, 0);
|
|
||||||
|
|
||||||
File profileOutFile = new File(attemptId.toString() + ".profile");
|
|
||||||
assertTrue("Couldn't find profiler output", profileOutFile.exists());
|
|
||||||
assertTrue("Couldn't remove profiler output", profileOutFile.delete());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected CLI createJobClient() throws IOException {
|
protected CLI createJobClient() throws IOException {
|
||||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue