diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java index d87c4fe1760..fd5f6b5506a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; @@ -78,7 +79,7 @@ public class Export { job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Result.class); - FileOutputFormat.setOutputPath(job, outputDir); + FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs. return job; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 57e6acb6171..0a5406f82c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1648,6 +1648,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { forceChangeTaskLogDir(); + // Tests were failing because this process used 6GB of virtual memory and was getting killed. + // we up the VM usable so that processes don't get killed. + conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); + // Allow the user to override FS URI for this map-reduce cluster to use. mrCluster = new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1, @@ -1656,6 +1660,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { if (jobConf == null) { jobConf = mrCluster.createJobConf(); } + jobConf.set("mapred.local.dir", conf.get("mapred.local.dir")); //Hadoop MiniMR overwrites this while it should not LOG.info("Mini mapreduce cluster started"); @@ -1664,14 +1669,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { conf.set("mapred.job.tracker", jobConf.get("mapred.job.tracker")); // this for mrv2 support; mr1 ignores this conf.set("mapreduce.framework.name", "yarn"); - String rmAdress = jobConf.get("yarn.resourcemanager.address"); - if (rmAdress != null) { - conf.set("yarn.resourcemanager.address", rmAdress); + String rmAddress = jobConf.get("yarn.resourcemanager.address"); + if (rmAddress != null) { + conf.set("yarn.resourcemanager.address", rmAddress); } - String schedulerAdress = + String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); - if (schedulerAdress != null) { - conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress); + if (schedulerAddress != null) { + conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 138e8ee0d2b..23c14c8ccc5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -51,9 +50,10 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.util.Iterator; -import java.util.Map; +/** + * Tests the table import and table export MR job functionality + */ @Category(MediumTests.class) public class TestImportExport { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); @@ -65,15 +65,16 @@ public class TestImportExport { private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); private static final byte[] QUAL = Bytes.toBytes("q"); private static final String OUTPUT_DIR = "outputdir"; + private static String FQ_OUTPUT_DIR; private static final String EXPORT_BATCH_SIZE = "100"; - private static MiniHBaseCluster cluster; private static long now = System.currentTimeMillis(); @BeforeClass public static void beforeClass() throws Exception { - cluster = UTIL.startMiniCluster(); + UTIL.startMiniCluster(); UTIL.startMiniMapReduceCluster(); + FQ_OUTPUT_DIR = new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); } @AfterClass @@ -90,24 +91,39 @@ public class TestImportExport { } /** - * When running on Hadoop 2, we need to copy (or add) configuration values for keys - * that start with "yarn." (from the map reduce minicluster) to the - * configuration that will be used during the test (from the HBase minicluster). - * YARN configuration values are set properly in the map reduce minicluster, - * but not necessarily in the HBase mini cluster. - * @param srcConf the configuration to copy from (the map reduce minicluster version) - * @param destConf the configuration to copy to (the HBase minicluster version) + * Runs an export job with the specified command line args + * @param args + * @return true if job completed successfully + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException */ - private void copyConfigurationValues(Configuration srcConf, Configuration destConf) { - Iterator> it = srcConf.iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - String key = entry.getKey(); - String value = entry.getValue(); - if (key.startsWith("yarn.") && !value.isEmpty()) { - destConf.set(key, value); - } - } + boolean runExport(String[] args) throws IOException, InterruptedException, ClassNotFoundException { + // need to make a copy of the configuration because to make sure different temp dirs are used. + GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = Export.createSubmittableJob(conf, args); + job.waitForCompletion(false); + return job.isSuccessful(); + } + + /** + * Runs an import job with the specified command line args + * @param args + * @return true if job completed successfully + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + boolean runImport(String[] args) throws IOException, InterruptedException, ClassNotFoundException { + // need to make a copy of the configuration because to make sure different temp dirs are used. + GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = Import.createSubmittableJob(conf, args); + job.waitForCompletion(false); + return job.isSuccessful(); } /** @@ -131,42 +147,19 @@ public class TestImportExport { String[] args = new String[] { EXPORT_TABLE, - OUTPUT_DIR, + FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export }; - - GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - Configuration conf = opts.getConfiguration(); - - // copy or add the necessary configuration values from the map reduce config to the hbase config - copyConfigurationValues(UTIL.getConfiguration(), conf); - args = opts.getRemainingArgs(); - - Job job = Export.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); - + assertTrue(runExport(args)); String IMPORT_TABLE = "importTableSimpleCase"; t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB); args = new String[] { "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, IMPORT_TABLE, - OUTPUT_DIR + FQ_OUTPUT_DIR }; - - opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - conf = opts.getConfiguration(); - - // copy or add the necessary configuration values from the map reduce config to the hbase config - copyConfigurationValues(UTIL.getConfiguration(), conf); - args = opts.getRemainingArgs(); - - job = Import.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); + assertTrue(runImport(args)); Get g = new Get(ROW1); g.setMaxVersions(); @@ -186,19 +179,8 @@ public class TestImportExport { @Test public void testMetaExport() throws Exception { String EXPORT_TABLE = ".META."; - String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, "1", "0", "0" }; - GenericOptionsParser opts = new GenericOptionsParser(new Configuration( - cluster.getConfiguration()), args); - Configuration conf = opts.getConfiguration(); - - // copy or add the necessary configuration values from the map reduce config to the hbase config - copyConfigurationValues(UTIL.getConfiguration(), conf); - args = opts.getRemainingArgs(); - - Job job = Export.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); + String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" }; + assertTrue(runExport(args)); } /** @@ -225,21 +207,12 @@ public class TestImportExport { String[] args = new String[] { "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. BATCH_TABLE, - OUTPUT_DIR + FQ_OUTPUT_DIR }; - - GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - Configuration conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - assertEquals(conf.get(Export.EXPORT_BATCHING), EXPORT_BATCH_SIZE); + assertTrue(runExport(args)); - Job job = Export.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); - FileSystem fs = FileSystem.get(UTIL.getConfiguration()); - fs.delete(new Path(OUTPUT_DIR), true); + fs.delete(new Path(FQ_OUTPUT_DIR), true); } @Test @@ -266,26 +239,14 @@ public class TestImportExport { d = new Delete(ROW1); d.deleteColumns(FAMILYA, QUAL, now+2); t.delete(d); - + String[] args = new String[] { "-D" + Export.RAW_SCAN + "=true", EXPORT_TABLE, - OUTPUT_DIR, + FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export }; - - GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - Configuration conf = opts.getConfiguration(); - - // copy or add the necessary configuration values from the map reduce config to the hbase config - copyConfigurationValues(UTIL.getConfiguration(), conf); - args = opts.getRemainingArgs(); - - Job job = Export.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); - + assertTrue(runExport(args)); String IMPORT_TABLE = "importWithDeletes"; desc = new HTableDescriptor(IMPORT_TABLE); @@ -298,20 +259,9 @@ public class TestImportExport { t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE); args = new String[] { IMPORT_TABLE, - OUTPUT_DIR + FQ_OUTPUT_DIR }; - - opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - conf = opts.getConfiguration(); - - // copy or add the necessary configuration values from the map reduce config to the hbase config - copyConfigurationValues(UTIL.getConfiguration(), conf); - args = opts.getRemainingArgs(); - - job = Import.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); + assertTrue(runImport(args)); Scan s = new Scan(); s.setMaxVersions(); @@ -329,8 +279,13 @@ public class TestImportExport { t.close(); } + /** + * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, + * attempt with invalid values. + */ @Test public void testWithFilter() throws Exception { + // Create simple table to export String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter"; HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE); desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); @@ -345,18 +300,11 @@ public class TestImportExport { p.add(FAMILYA, QUAL, now + 4, QUAL); exportTable.put(p); - String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, "1000" }; - - GenericOptionsParser opts = new GenericOptionsParser(new Configuration( - cluster.getConfiguration()), args); - Configuration conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - - Job job = Export.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); + // Export the simple table + String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" }; + assertTrue(runExport(args)); + // Import to a new table String IMPORT_TABLE = "importWithFilter"; desc = new HTableDescriptor(IMPORT_TABLE); desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); @@ -364,17 +312,9 @@ public class TestImportExport { HTable importTable = new HTable(UTIL.getConfiguration(), IMPORT_TABLE); args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), - "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, OUTPUT_DIR, + "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR, "1000" }; - - opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - - job = Import.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); + assertTrue(runImport(args)); // get the count of the source table for that time range PrefixFilter filter = new PrefixFilter(ROW1); @@ -388,16 +328,8 @@ public class TestImportExport { args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE, - OUTPUT_DIR, "1000" }; - - opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - - job = Import.createSubmittableJob(conf, args); - job.getConfiguration().set("mapreduce.framework.name", "yarn"); - job.waitForCompletion(false); - assertFalse("Job succeeedd, but it had a non-instantiable filter!", job.isSuccessful()); + FQ_OUTPUT_DIR, "1000" }; + assertFalse(runImport(args)); // cleanup exportTable.close();