From 53d775f24262e84782430cf31788cab52e796ae9 Mon Sep 17 00:00:00 2001 From: stack Date: Tue, 25 Nov 2014 12:04:42 -0800 Subject: [PATCH] HBASE-12514 Cleanup HFileOutputFormat legacy code (Solomon Duskis) Signed-off-by: stack --- .../mapreduce/IntegrationTestBulkLoad.java | 2 +- .../hbase/mapreduce/HFileOutputFormat.java | 3 +- .../hbase/mapreduce/HFileOutputFormat2.java | 43 +++++++++++++++---- .../apache/hadoop/hbase/mapreduce/Import.java | 2 +- .../hadoop/hbase/mapreduce/ImportTsv.java | 2 +- .../hadoop/hbase/mapreduce/WALPlayer.java | 2 +- .../mapreduce/TestHFileOutputFormat2.java | 6 +-- 7 files changed, 43 insertions(+), 17 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index 6c52e4f9557..185fbae9cd5 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -275,7 +275,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { FileOutputFormat.setOutputPath(job, p); // Configure the partitioner and other things needed for HFileOutputFormat. - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); // Run the job making sure it works. assertEquals(true, job.waitForCompletion(true)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index c10359e6003..402381b36c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -65,6 +65,7 @@ public class HFileOutputFormat extends FileOutputFormat getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { return HFileOutputFormat2.createRecordWriter(context); @@ -86,7 +87,7 @@ public class HFileOutputFormat extends FileOutputFormat * Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, HTable)}. + * using {@link #configureIncrementalLoad(Job, Table, RegionLocator, Class)}. */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -107,6 +108,7 @@ public class HFileOutputFormat2 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { return createRecordWriter(context); @@ -114,7 +116,7 @@ public class HFileOutputFormat2 static RecordWriter createRecordWriter(final TaskAttemptContext context) - throws IOException, InterruptedException { + throws IOException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); @@ -155,6 +157,7 @@ public class HFileOutputFormat2 private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); private boolean rollRequested = false; + @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); @@ -267,6 +270,7 @@ public class HFileOutputFormat2 } } + @Override public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (WriterLength wl: this.writers.values()) { @@ -354,13 +358,35 @@ public class HFileOutputFormat2 * * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. + * + * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead. */ + @Deprecated public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - configureIncrementalLoad(job, table, HFileOutputFormat2.class); + configureIncrementalLoad(job, table, table); } - static void configureIncrementalLoad(Job job, HTable table, + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + *
    + *
  • Inspects the table to configure a total order partitioner
  • + *
  • Uploads the partitions file to the cluster and adds it to the DistributedCache
  • + *
  • Sets the number of reduce tasks to match the current number of regions
  • + *
  • Sets the output key/value class to match HFileOutputFormat2's requirements
  • + *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or + * PutSortReducer)
  • + *
+ * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) + throws IOException { + configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class); + } + + static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator, Class> cls) throws IOException { Configuration conf = job.getConfiguration(); @@ -386,8 +412,8 @@ public class HFileOutputFormat2 KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); - List startKeys = getRegionStartKeys(table); + LOG.info("Looking up current regions for table " + table.getName()); + List startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); @@ -401,8 +427,7 @@ public class HFileOutputFormat2 TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + Bytes.toString(table.getTableName()) - + " output configured."); + LOG.info("Incremental table " + table.getName() + " output configured."); } public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index d867fdd8fa4..399d6079914 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -447,7 +447,7 @@ public class Import { FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index d09f7876d51..7ae659986aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -486,7 +486,7 @@ public class ImportTsv extends Configured implements Tool { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); } - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); } } else { if (!admin.tableExists(tableName)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 39b573df36d..415d14c9d9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -251,7 +251,7 @@ public class WALPlayer extends Configured implements Tool { Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 01e4963a8b3..08e6b6cf628 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -336,7 +336,7 @@ public class TestHFileOutputFormat2 { job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); HTable table = Mockito.mock(HTable.class); setupMockStartKeys(table); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); assertEquals(job.getNumReduceTasks(), 4); } @@ -466,7 +466,7 @@ public class TestHFileOutputFormat2 { MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); setupRandomGeneratorMapper(job); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); FileOutputFormat.setOutputPath(job, outDir); assertFalse(util.getTestFileSystem().exists(outDir)) ; @@ -809,7 +809,7 @@ public class TestHFileOutputFormat2 { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); HFileOutputFormat2 hof = new HFileOutputFormat2();