HBASE-12514 Cleanup HFileOutputFormat legacy code (Solomon Duskis)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
stack 2014-11-25 12:04:42 -08:00
parent 56a03d736a
commit 69a437ac0d
7 changed files with 51 additions and 25 deletions

View File

@ -275,7 +275,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
FileOutputFormat.setOutputPath(job, p); FileOutputFormat.setOutputPath(job, p);
// Configure the partitioner and other things needed for HFileOutputFormat. // Configure the partitioner and other things needed for HFileOutputFormat.
HFileOutputFormat.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table, table);
// Run the job making sure it works. // Run the job making sure it works.
assertEquals(true, job.waitForCompletion(true)); assertEquals(true, job.waitForCompletion(true));

View File

@ -65,6 +65,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
HFileOutputFormat2.DATABLOCK_ENCODING_OVERRIDE_CONF_KEY; HFileOutputFormat2.DATABLOCK_ENCODING_OVERRIDE_CONF_KEY;
@Override
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter( public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(
final TaskAttemptContext context) throws IOException, InterruptedException { final TaskAttemptContext context) throws IOException, InterruptedException {
return HFileOutputFormat2.createRecordWriter(context); return HFileOutputFormat2.createRecordWriter(context);
@ -86,7 +87,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
*/ */
public static void configureIncrementalLoad(Job job, HTable table) public static void configureIncrementalLoad(Job job, HTable table)
throws IOException { throws IOException {
HFileOutputFormat2.configureIncrementalLoad(job, table, HFileOutputFormat.class); HFileOutputFormat2.configureIncrementalLoad(job, table, table);
} }
/** /**

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
@ -75,11 +76,11 @@ import com.google.common.annotations.VisibleForTesting;
/** /**
* Writes HFiles. Passed Cells must arrive in order. * Writes HFiles. Passed Cells must arrive in order.
* Writes current time as the sequence id for the file. Sets the major compacted * Writes current time as the sequence id for the file. Sets the major compacted
* attribute on created hfiles. Calling write(null,null) will forcibly roll * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
* all HFiles being written. * all HFiles being written.
* <p> * <p>
* Using this class as part of a MapReduce job is best done * Using this class as part of a MapReduce job is best done
* using {@link #configureIncrementalLoad(Job, HTable)}. * using {@link #configureIncrementalLoad(Job, Table, RegionLocator, Class)}.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -107,6 +108,7 @@ public class HFileOutputFormat2
public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.datablock.encoding"; "hbase.mapreduce.hfileoutputformat.datablock.encoding";
@Override
public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
final TaskAttemptContext context) throws IOException, InterruptedException { final TaskAttemptContext context) throws IOException, InterruptedException {
return createRecordWriter(context); return createRecordWriter(context);
@ -114,7 +116,7 @@ public class HFileOutputFormat2
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
createRecordWriter(final TaskAttemptContext context) createRecordWriter(final TaskAttemptContext context)
throws IOException, InterruptedException { throws IOException {
// Get the path of the temporary output file // Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputPath = FileOutputFormat.getOutputPath(context);
@ -155,6 +157,7 @@ public class HFileOutputFormat2
private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
private boolean rollRequested = false; private boolean rollRequested = false;
@Override
public void write(ImmutableBytesWritable row, V cell) public void write(ImmutableBytesWritable row, V cell)
throws IOException { throws IOException {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
@ -267,6 +270,7 @@ public class HFileOutputFormat2
} }
} }
@Override
public void close(TaskAttemptContext c) public void close(TaskAttemptContext c)
throws IOException, InterruptedException { throws IOException, InterruptedException {
for (WriterLength wl: this.writers.values()) { for (WriterLength wl: this.writers.values()) {
@ -354,13 +358,35 @@ public class HFileOutputFormat2
* </ul> * </ul>
* The user should be sure to set the map output value class to either KeyValue or Put before * The user should be sure to set the map output value class to either KeyValue or Put before
* running this function. * running this function.
*
* @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead.
*/ */
@Deprecated
public static void configureIncrementalLoad(Job job, HTable table) public static void configureIncrementalLoad(Job job, HTable table)
throws IOException { throws IOException {
configureIncrementalLoad(job, table, HFileOutputFormat2.class); configureIncrementalLoad(job, table, table);
} }
static void configureIncrementalLoad(Job job, HTable table, /**
* Configure a MapReduce Job to perform an incremental load into the given
* table. This
* <ul>
* <li>Inspects the table to configure a total order partitioner</li>
* <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
* <li>Sets the number of reduce tasks to match the current number of regions</li>
* <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
* <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
* PutSortReducer)</li>
* </ul>
* The user should be sure to set the map output value class to either KeyValue or Put before
* running this function.
*/
public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
throws IOException {
configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class);
}
static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator,
Class<? extends OutputFormat<?, ?>> cls) throws IOException { Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
@ -386,8 +412,8 @@ public class HFileOutputFormat2
KeyValueSerialization.class.getName()); KeyValueSerialization.class.getName());
// Use table's region boundaries for TOP split points. // Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); LOG.info("Looking up current regions for table " + table.getName());
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " + LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count"); "to match current region count");
job.setNumReduceTasks(startKeys.size()); job.setNumReduceTasks(startKeys.size());
@ -401,8 +427,7 @@ public class HFileOutputFormat2
TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job); TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + Bytes.toString(table.getTableName()) LOG.info("Incremental table " + table.getName() + " output configured.");
+ " output configured.");
} }
public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {

View File

@ -452,7 +452,7 @@ public class Import extends Configured implements Tool {
FileOutputFormat.setOutputPath(job, outputDir); FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class); job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table, table);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Preconditions.class); com.google.common.base.Preconditions.class);
} else { } else {

View File

@ -486,7 +486,7 @@ public class ImportTsv extends Configured implements Tool {
job.setMapOutputValueClass(Put.class); job.setMapOutputValueClass(Put.class);
job.setCombinerClass(PutCombiner.class); job.setCombinerClass(PutCombiner.class);
} }
HFileOutputFormat.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table, table);
} }
} else { } else {
if (!admin.tableExists(tableName)) { if (!admin.tableExists(tableName)) {

View File

@ -265,7 +265,7 @@ public class WALPlayer extends Configured implements Tool {
Path outputDir = new Path(hfileOutPath); Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir); FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(KeyValue.class); job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table, table);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Preconditions.class); com.google.common.base.Preconditions.class);
} else { } else {

View File

@ -337,7 +337,7 @@ public class TestHFileOutputFormat2 {
job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
HTable table = Mockito.mock(HTable.class); HTable table = Mockito.mock(HTable.class);
setupMockStartKeys(table); setupMockStartKeys(table);
HFileOutputFormat2.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table, table);
assertEquals(job.getNumReduceTasks(), 4); assertEquals(job.getNumReduceTasks(), 4);
} }
@ -467,7 +467,7 @@ public class TestHFileOutputFormat2 {
MutationSerialization.class.getName(), ResultSerialization.class.getName(), MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName()); KeyValueSerialization.class.getName());
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);
HFileOutputFormat2.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table, table);
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
assertFalse(util.getTestFileSystem().exists(outDir)) ; assertFalse(util.getTestFileSystem().exists(outDir)) ;
@ -810,7 +810,7 @@ public class TestHFileOutputFormat2 {
Job job = new Job(conf, "testLocalMRIncrementalLoad"); Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);
HFileOutputFormat2.configureIncrementalLoad(job, table); HFileOutputFormat2.configureIncrementalLoad(job, table, table);
FileOutputFormat.setOutputPath(job, dir); FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job); context = createTestTaskAttemptContext(job);
HFileOutputFormat2 hof = new HFileOutputFormat2(); HFileOutputFormat2 hof = new HFileOutputFormat2();