HBASE-12008 Remove IntegrationTestImportTsv#testRunFromOutputCommitter
This commit is contained in:
parent
97a458010d
commit
895768dd70
|
@ -33,37 +33,20 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||||
import org.apache.hadoop.hbase.IntegrationTests;
|
import org.apache.hadoop.hbase.IntegrationTests;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapred.JobClient;
|
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
|
||||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
|
||||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
||||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
|
||||||
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
|
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
@ -226,161 +209,6 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
|
||||||
LOG.info("testGenerateAndLoad completed successfully.");
|
LOG.info("testGenerateAndLoad completed successfully.");
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// helper classes used in the following test.
|
|
||||||
//
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A {@link FileOutputCommitter} that launches an ImportTsv job through
|
|
||||||
* its {@link #commitJob(JobContext)} method.
|
|
||||||
*/
|
|
||||||
private static class JobLaunchingOuputCommitter extends FileOutputCommitter {
|
|
||||||
|
|
||||||
public JobLaunchingOuputCommitter(Path outputPath, TaskAttemptContext context)
|
|
||||||
throws IOException {
|
|
||||||
super(outputPath, context);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void commitJob(JobContext context) throws IOException {
|
|
||||||
super.commitJob(context);
|
|
||||||
|
|
||||||
// inherit jar dependencies added to distributed cache loaded by parent job
|
|
||||||
Configuration conf = HBaseConfiguration.create(context.getConfiguration());
|
|
||||||
conf.set("mapreduce.job.classpath.archives",
|
|
||||||
context.getConfiguration().get("mapreduce.job.classpath.archives", ""));
|
|
||||||
conf.set("mapreduce.job.cache.archives.visibilities",
|
|
||||||
context.getConfiguration().get("mapreduce.job.cache.archives.visibilities", ""));
|
|
||||||
|
|
||||||
// can't use IntegrationTest instance of util because it hasn't been
|
|
||||||
// instantiated on the JVM running this method. Create our own.
|
|
||||||
IntegrationTestingUtility util =
|
|
||||||
new IntegrationTestingUtility(conf);
|
|
||||||
|
|
||||||
// this is why we're here: launch a child job. The rest of this should
|
|
||||||
// look a lot like TestImportTsv#testMROnTable.
|
|
||||||
final String table = format("%s-%s-child", NAME, context.getJobID());
|
|
||||||
final String cf = "FAM";
|
|
||||||
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
|
|
||||||
conf.set(ImportTsv.CREDENTIALS_LOCATION, fileLocation);
|
|
||||||
String[] args = {
|
|
||||||
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
|
|
||||||
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
|
|
||||||
table
|
|
||||||
};
|
|
||||||
|
|
||||||
try {
|
|
||||||
util.createTable(table, cf);
|
|
||||||
LOG.info("testRunFromOutputCommitter: launching child job.");
|
|
||||||
TestImportTsv.doMROnTableTest(util, cf, null, args, 1);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException("Underlying MapReduce job failed. Aborting commit.", e);
|
|
||||||
} finally {
|
|
||||||
if (util.getHBaseAdmin().tableExists(TableName.valueOf(table))) {
|
|
||||||
util.deleteTable(table);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An {@link OutputFormat} that exposes the <code>JobLaunchingOutputCommitter</code>.
|
|
||||||
*/
|
|
||||||
public static class JobLaunchingOutputFormat extends FileOutputFormat<LongWritable, Text> {
|
|
||||||
|
|
||||||
private OutputCommitter committer = null;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
return new RecordWriter<LongWritable, Text>() {
|
|
||||||
@Override
|
|
||||||
public void write(LongWritable key, Text value) throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
/* do nothing */
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close(TaskAttemptContext context) throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
/* do nothing */
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
|
|
||||||
throws IOException {
|
|
||||||
if (committer == null) {
|
|
||||||
Path output = getOutputPath(context);
|
|
||||||
LOG.debug("Using JobLaunchingOuputCommitter.");
|
|
||||||
committer = new JobLaunchingOuputCommitter(output, context);
|
|
||||||
}
|
|
||||||
return committer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add classes necessary for integration-test jobs.
|
|
||||||
*/
|
|
||||||
public static void addTestDependencyJars(Configuration conf) throws IOException {
|
|
||||||
TableMapReduceUtil.addDependencyJars(conf,
|
|
||||||
org.apache.hadoop.hbase.BaseConfigurable.class, // hbase-server
|
|
||||||
HBaseTestingUtility.class, // hbase-server-test
|
|
||||||
HBaseCommonTestingUtility.class, // hbase-common-test
|
|
||||||
com.google.common.collect.ListMultimap.class, // Guava
|
|
||||||
org.htrace.Trace.class); // HTrace
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link TableMapReduceUtil#addDependencyJars(Job)} is used when
|
|
||||||
* configuring a mapreduce job to ensure dependencies of the job are shipped
|
|
||||||
* to the cluster. Sometimes those dependencies are on the classpath, but not
|
|
||||||
* packaged as a jar, for instance, when run at the end of another mapreduce
|
|
||||||
* job. In that case, dependency jars have already been shipped to the cluster
|
|
||||||
* and expanded in the parent job's run folder. This test validates the child
|
|
||||||
* job's classpath is constructed correctly under that scenario.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testRunFromOutputCommitter() throws Exception {
|
|
||||||
LOG.info("Running test testRunFromOutputCommitter.");
|
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(getConf());
|
|
||||||
Path inputPath = new Path(util.getDataTestDirOnTestFS("parent"), "input.txt");
|
|
||||||
Path outputPath = new Path(util.getDataTestDirOnTestFS("parent"), "output");
|
|
||||||
FSDataOutputStream fout = null;
|
|
||||||
try {
|
|
||||||
fout = fs.create(inputPath, true);
|
|
||||||
fout.write(Bytes.toBytes("testRunFromOutputCommitter\n"));
|
|
||||||
LOG.debug(format("Wrote test data to file: %s", inputPath));
|
|
||||||
} finally {
|
|
||||||
if (fout != null) {
|
|
||||||
fout.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// create a parent job that ships the HBase dependencies. This is
|
|
||||||
// accurate as the expected calling context.
|
|
||||||
Job job = new Job(getConf(), NAME + ".testRunFromOutputCommitter - parent");
|
|
||||||
job.setJarByClass(IntegrationTestImportTsv.class);
|
|
||||||
job.setInputFormatClass(TextInputFormat.class);
|
|
||||||
job.setOutputFormatClass(JobLaunchingOutputFormat.class);
|
|
||||||
TextInputFormat.addInputPath(job, inputPath);
|
|
||||||
JobLaunchingOutputFormat.setOutputPath(job, outputPath);
|
|
||||||
TableMapReduceUtil.addDependencyJars(job);
|
|
||||||
addTestDependencyJars(job.getConfiguration());
|
|
||||||
TableMapReduceUtil.initCredentials(job);
|
|
||||||
JobClient jc = new JobClient(job.getConfiguration());
|
|
||||||
job.getCredentials().addToken(new Text("my_mr_token"),
|
|
||||||
jc.getDelegationToken(new Text("renewer")));
|
|
||||||
|
|
||||||
// Job launched by the OutputCommitter will fail if dependency jars are
|
|
||||||
// not shipped properly.
|
|
||||||
LOG.info("testRunFromOutputCommitter: launching parent job.");
|
|
||||||
assertTrue(job.waitForCompletion(true));
|
|
||||||
LOG.info("testRunFromOutputCommitter completed successfully.");
|
|
||||||
}
|
|
||||||
|
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
if (args.length != 0) {
|
if (args.length != 0) {
|
||||||
System.err.println(format("%s [genericOptions]", NAME));
|
System.err.println(format("%s [genericOptions]", NAME));
|
||||||
|
@ -394,7 +222,6 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
|
||||||
// IntegrationTestsDriver does.
|
// IntegrationTestsDriver does.
|
||||||
provisionCluster();
|
provisionCluster();
|
||||||
testGenerateAndLoad();
|
testGenerateAndLoad();
|
||||||
testRunFromOutputCommitter();
|
|
||||||
releaseCluster();
|
releaseCluster();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue