diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java index 5508acde4a5..df5994ad5b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -27,19 +29,18 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.filter.RowFilter; -import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.IncompatibleFilterException; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** * Export an HBase table. @@ -137,10 +138,10 @@ public class Export { int batching = conf.getInt(EXPORT_BATCHING, -1); if (batching != -1){ - try{ + try { s.setBatch(batching); - } catch (RuntimeException e) { - LOG.error("Batching could not be set", e); + } catch (IncompatibleFilterException e) { + LOG.error("Batching could not be set", e); } } LOG.info("versions=" + versions + ", starttime=" + startTime + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 390cc755971..f45a40b56a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -59,7 +59,7 @@ public class TestImportExport { private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); private static final byte[] QUAL = Bytes.toBytes("q"); private static final String OUTPUT_DIR = "outputdir"; - private static final String EXPORT_BATCHING = "100"; + private static final String EXPORT_BATCH_SIZE = "100"; private static MiniHBaseCluster cluster; private static long now = System.currentTimeMillis(); @@ -126,8 +126,7 @@ public class TestImportExport { String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, - EXPORT_BATCHING, - "1000" + "1000", // max number of key versions per key to export }; GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); @@ -196,6 +195,47 @@ public class TestImportExport { assertTrue(job.isSuccessful()); } + /** + * Test export scanner batching + */ + @Test + public void testExportScannerBatching() throws Exception { + String BATCH_TABLE = "exportWithBatch"; + HTableDescriptor desc = new HTableDescriptor(BATCH_TABLE); + desc.addFamily(new HColumnDescriptor(FAMILYA) + .setMaxVersions(1) + ); + UTIL.getHBaseAdmin().createTable(desc); + HTable t = new HTable(UTIL.getConfiguration(), BATCH_TABLE); + + Put p = new Put(ROW1); + p.add(FAMILYA, QUAL, now, QUAL); + p.add(FAMILYA, QUAL, now+1, QUAL); + p.add(FAMILYA, QUAL, now+2, QUAL); + p.add(FAMILYA, QUAL, now+3, QUAL); + p.add(FAMILYA, QUAL, now+4, QUAL); + t.put(p); + + String[] args = new String[] { + "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. + BATCH_TABLE, + OUTPUT_DIR + }; + + GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + assertEquals(conf.get(Export.EXPORT_BATCHING), EXPORT_BATCH_SIZE); + + Job job = Export.createSubmittableJob(conf, args); + job.getConfiguration().set("mapreduce.framework.name", "yarn"); + job.waitForCompletion(false); + assertTrue(job.isSuccessful()); + + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + fs.delete(new Path(OUTPUT_DIR), true); + } + @Test public void testWithDeletes() throws Exception { String EXPORT_TABLE = "exportWithDeletes"; @@ -225,8 +265,7 @@ public class TestImportExport { "-D" + Export.RAW_SCAN + "=true", EXPORT_TABLE, OUTPUT_DIR, - EXPORT_BATCHING, - "1000" + "1000", // max number of key versions per key to export }; GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);