diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java index 7fe51711fa1..b13d670d606 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java @@ -33,37 +33,20 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -226,161 +209,6 @@ public class IntegrationTestImportTsv implements Configurable, Tool { LOG.info("testGenerateAndLoad completed successfully."); } - // - // helper classes used in the following test. - // - - /** - * A {@link FileOutputCommitter} that launches an ImportTsv job through - * its {@link #commitJob(JobContext)} method. - */ - private static class JobLaunchingOuputCommitter extends FileOutputCommitter { - - public JobLaunchingOuputCommitter(Path outputPath, TaskAttemptContext context) - throws IOException { - super(outputPath, context); - } - - @Override - public void commitJob(JobContext context) throws IOException { - super.commitJob(context); - - // inherit jar dependencies added to distributed cache loaded by parent job - Configuration conf = HBaseConfiguration.create(context.getConfiguration()); - conf.set("mapreduce.job.classpath.archives", - context.getConfiguration().get("mapreduce.job.classpath.archives", "")); - conf.set("mapreduce.job.cache.archives.visibilities", - context.getConfiguration().get("mapreduce.job.cache.archives.visibilities", "")); - - // can't use IntegrationTest instance of util because it hasn't been - // instantiated on the JVM running this method. Create our own. - IntegrationTestingUtility util = - new IntegrationTestingUtility(conf); - - // this is why we're here: launch a child job. The rest of this should - // look a lot like TestImportTsv#testMROnTable. - final String table = format("%s-%s-child", NAME, context.getJobID()); - final String cf = "FAM"; - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - conf.set(ImportTsv.CREDENTIALS_LOCATION, fileLocation); - String[] args = { - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - table - }; - - try { - util.createTable(table, cf); - LOG.info("testRunFromOutputCommitter: launching child job."); - TestImportTsv.doMROnTableTest(util, cf, null, args, 1); - } catch (Exception e) { - throw new IOException("Underlying MapReduce job failed. Aborting commit.", e); - } finally { - if (util.getHBaseAdmin().tableExists(TableName.valueOf(table))) { - util.deleteTable(table); - } - } - } - } - - /** - * An {@link OutputFormat} that exposes the JobLaunchingOutputCommitter. - */ - public static class JobLaunchingOutputFormat extends FileOutputFormat { - - private OutputCommitter committer = null; - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext job) - throws IOException, InterruptedException { - return new RecordWriter() { - @Override - public void write(LongWritable key, Text value) throws IOException, - InterruptedException { - /* do nothing */ - } - - @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - /* do nothing */ - } - }; - } - - @Override - public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException { - if (committer == null) { - Path output = getOutputPath(context); - LOG.debug("Using JobLaunchingOuputCommitter."); - committer = new JobLaunchingOuputCommitter(output, context); - } - return committer; - } - } - - /** - * Add classes necessary for integration-test jobs. - */ - public static void addTestDependencyJars(Configuration conf) throws IOException { - TableMapReduceUtil.addDependencyJars(conf, - org.apache.hadoop.hbase.BaseConfigurable.class, // hbase-server - HBaseTestingUtility.class, // hbase-server-test - HBaseCommonTestingUtility.class, // hbase-common-test - com.google.common.collect.ListMultimap.class, // Guava - org.htrace.Trace.class); // HTrace - } - - /** - * {@link TableMapReduceUtil#addDependencyJars(Job)} is used when - * configuring a mapreduce job to ensure dependencies of the job are shipped - * to the cluster. Sometimes those dependencies are on the classpath, but not - * packaged as a jar, for instance, when run at the end of another mapreduce - * job. In that case, dependency jars have already been shipped to the cluster - * and expanded in the parent job's run folder. This test validates the child - * job's classpath is constructed correctly under that scenario. - */ - @Test - public void testRunFromOutputCommitter() throws Exception { - LOG.info("Running test testRunFromOutputCommitter."); - - FileSystem fs = FileSystem.get(getConf()); - Path inputPath = new Path(util.getDataTestDirOnTestFS("parent"), "input.txt"); - Path outputPath = new Path(util.getDataTestDirOnTestFS("parent"), "output"); - FSDataOutputStream fout = null; - try { - fout = fs.create(inputPath, true); - fout.write(Bytes.toBytes("testRunFromOutputCommitter\n")); - LOG.debug(format("Wrote test data to file: %s", inputPath)); - } finally { - if (fout != null) { - fout.close(); - } - } - - // create a parent job that ships the HBase dependencies. This is - // accurate as the expected calling context. - Job job = new Job(getConf(), NAME + ".testRunFromOutputCommitter - parent"); - job.setJarByClass(IntegrationTestImportTsv.class); - job.setInputFormatClass(TextInputFormat.class); - job.setOutputFormatClass(JobLaunchingOutputFormat.class); - TextInputFormat.addInputPath(job, inputPath); - JobLaunchingOutputFormat.setOutputPath(job, outputPath); - TableMapReduceUtil.addDependencyJars(job); - addTestDependencyJars(job.getConfiguration()); - TableMapReduceUtil.initCredentials(job); - JobClient jc = new JobClient(job.getConfiguration()); - job.getCredentials().addToken(new Text("my_mr_token"), - jc.getDelegationToken(new Text("renewer"))); - - // Job launched by the OutputCommitter will fail if dependency jars are - // not shipped properly. - LOG.info("testRunFromOutputCommitter: launching parent job."); - assertTrue(job.waitForCompletion(true)); - LOG.info("testRunFromOutputCommitter completed successfully."); - } - public int run(String[] args) throws Exception { if (args.length != 0) { System.err.println(format("%s [genericOptions]", NAME)); @@ -394,7 +222,6 @@ public class IntegrationTestImportTsv implements Configurable, Tool { // IntegrationTestsDriver does. provisionCluster(); testGenerateAndLoad(); - testRunFromOutputCommitter(); releaseCluster(); return 0;