HBASE-8147. Adds an integration test for HBASE-8140 (issue observed when commitJob was launching another job).

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1461303 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devaraj Das 2013-03-26 20:47:19 +00:00
parent 1325cd776b
commit 0d8c61cd88

View File

@ -16,9 +16,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.IntegrationTests;
import org.apache.hadoop.hbase.KeyValue;
@ -27,6 +30,17 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
@ -154,6 +168,7 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
@Test
public void testGenerateAndLoad() throws Exception {
LOG.info("Running test testGenerateAndLoad.");
String table = NAME + "-" + UUID.randomUUID();
String cf = "d";
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
@ -179,6 +194,153 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
// clean up after ourselves.
util.deleteTable(table);
util.cleanupDataTestDirOnTestFS(table);
LOG.info("testGenerateAndLoad completed successfully.");
}
//
// helper classes used in the following test.
//
/**
* A {@link FileOutputCommitter} that launches an ImportTsv job through
* its {@link #commitJob(JobContext)} method.
*/
private static class JobLaunchingOuputCommitter extends FileOutputCommitter {
public JobLaunchingOuputCommitter(Path outputPath, TaskAttemptContext context)
throws IOException {
super(outputPath, context);
}
@Override
public void commitJob(JobContext context) throws IOException {
super.commitJob(context);
// inherit jar dependencies added to distributed cache loaded by parent job
Configuration conf = HBaseConfiguration.create();
conf.set("mapred.job.classpath.archives",
context.getConfiguration().get("mapred.job.classpath.archives", ""));
conf.set("mapreduce.job.cache.archives.visibilities",
context.getConfiguration().get("mapreduce.job.cache.archives.visibilities", ""));
// can't use IntegrationTest instance of util because it hasn't been
// instantiated on the JVM running this method. Create our own.
IntegrationTestingUtility util =
new IntegrationTestingUtility(conf);
// this is why we're here: launch a child job. The rest of this should
// look a lot like TestImportTsv#testMROnTable.
final String table = format("%s-%s-child", NAME, context.getJobID());
final String cf = "FAM";
String[] args = {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
table
};
try {
util.createTable(table, cf);
LOG.info("testRunFromOutputCommitter: launching child job.");
TestImportTsv.doMROnTableTest(util, cf, null, args, 1);
} catch (Exception e) {
throw new IOException("Underlying MapReduce job failed. Aborting commit.", e);
} finally {
util.deleteTable(table);
}
}
}
/**
* An {@link OutputFormat} that exposes the <code>JobLaunchingOutputCommitter</code>.
*/
public static class JobLaunchingOutputFormat extends FileOutputFormat<LongWritable, Text> {
private OutputCommitter committer = null;
@Override
public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
return new RecordWriter<LongWritable, Text>() {
@Override
public void write(LongWritable key, Text value) throws IOException,
InterruptedException {
/* do nothing */
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
/* do nothing */
}
};
}
@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
LOG.debug("Using JobLaunchingOuputCommitter.");
committer = new JobLaunchingOuputCommitter(output, context);
}
return committer;
}
}
/**
* Add classes necessary for integration-test jobs.
*/
public static void addTestDependencyJars(Configuration conf) throws IOException {
TableMapReduceUtil.addDependencyJars(conf,
org.apache.hadoop.hbase.BaseConfigurable.class, // hbase-server
HBaseTestingUtility.class, // hbase-server-test
HBaseCommonTestingUtility.class, // hbase-common-test
com.google.common.collect.ListMultimap.class, // Guava
org.cloudera.htrace.Trace.class); // HTrace
}
/**
* {@link TableMapReduceUtil#addDependencyJars(Job)} is used when
* configuring a mapreduce job to ensure dependencies of the job are shipped
* to the cluster. Sometimes those dependencies are on the classpath, but not
* packaged as a jar, for instance, when run at the end of another mapreduce
* job. In that case, dependency jars have already been shipped to the cluster
* and expanded in the parent job's run folder. This test validates the child
* job's classpath is constructed correctly under that scenario.
*/
@Test
public void testRunFromOutputCommitter() throws Exception {
LOG.info("Running test testRunFromOutputCommitter.");
FileSystem fs = FileSystem.get(getConf());
Path inputPath = new Path(util.getDataTestDirOnTestFS("parent"), "input.txt");
Path outputPath = new Path(util.getDataTestDirOnTestFS("parent"), "output");
FSDataOutputStream fout = null;
try {
fout = fs.create(inputPath, true);
fout.write(Bytes.toBytes("testRunFromOutputCommitter\n"));
LOG.debug(format("Wrote test data to file: %s", inputPath));
} finally {
fout.close();
}
// create a parent job that ships the HBase dependencies. This is
// accurate as the expected calling context.
Job job = new Job(getConf(), NAME + ".testRunFromOutputCommitter - parent");
job.setJarByClass(IntegrationTestImportTsv.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(JobLaunchingOutputFormat.class);
TextInputFormat.addInputPath(job, inputPath);
JobLaunchingOutputFormat.setOutputPath(job, outputPath);
TableMapReduceUtil.addDependencyJars(job);
addTestDependencyJars(job.getConfiguration());
// Job launched by the OutputCommitter will fail if dependency jars are
// not shipped properly.
LOG.info("testRunFromOutputCommitter: launching parent job.");
assertTrue(job.waitForCompletion(true));
LOG.info("testRunFromOutputCommitter completed successfully.");
}
public int run(String[] args) throws Exception {
@ -194,6 +356,7 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
// IntegrationTestsDriver does.
provisionCluster();
testGenerateAndLoad();
testRunFromOutputCommitter();
releaseCluster();
return 0;