[hbase] Bulk load tools

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@818193 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-09-23 18:42:34 +00:00
parent e14fa43bbc
commit 07219dc721
8 changed files with 552 additions and 9 deletions

View File

@ -74,6 +74,7 @@ Release 0.21.0 - Unreleased
HBASE-1684 Backup (Export/Import) contrib tool for 0.20
HBASE-1854 Remove the Region Historian
HBASE-1860 Change HTablePool#createHTable from private to protected
HBASE-48 Bulk load tools
OPTIMIZATIONS

132
bin/loadtable.rb Normal file
View File

@ -0,0 +1,132 @@
# Script that takes over from org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.
# Pass it output directory of HFileOutputFormat. It will read the passed files,
# move them into place and update the catalog table appropriately. Warning:
# it will overwrite anything that exists already for passed table.
# It expects hbase to be up and running so it can insert table info.
#
# To see usage for this script, run:
#
# ${HBASE_HOME}/bin/hbase org.jruby.Main loadtable.rb
#
include Java
import java.util.TreeMap
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.FSUtils
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Writables
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HRegionInfo
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HRegionInfo
import org.apache.hadoop.hbase.io.hfile.HFile
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred.OutputLogFilter
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
# Name of this script
NAME = "loadtable"
# Print usage for this script
def usage
puts 'Usage: %s.rb TABLENAME HFILEOUTPUTFORMAT_OUTPUT_DIR' % NAME
exit!
end
# Passed 'dir' exists and is a directory else exception
def isDirExists(fs, dir)
raise IOError.new("Does not exit: " + dir.toString()) unless fs.exists(dir)
raise IOError.new("Not a directory: " + dir.toString()) unless fs.isDirectory(dir)
end
# Check arguments
if ARGV.size != 2
usage
end
# Check good table names were passed.
tableName = HTableDescriptor.isLegalTableName(ARGV[0].to_java_bytes)
outputdir = Path.new(ARGV[1])
# Get configuration to use.
c = HBaseConfiguration.new()
# Get a logger and a metautils instance.
LOG = LogFactory.getLog(NAME)
# Set hadoop filesystem configuration using the hbase.rootdir.
# Otherwise, we'll always use localhost though the hbase.rootdir
# might be pointing at hdfs location.
c.set("fs.default.name", c.get(HConstants::HBASE_DIR))
fs = FileSystem.get(c)
# If hfiles directory does not exist, exit.
isDirExists(fs, outputdir)
# Create table dir if it doesn't exist.
rootdir = FSUtils.getRootDir(c)
tableDir = Path.new(rootdir, Path.new(Bytes.toString(tableName)))
fs.mkdirs(tableDir) unless fs.exists(tableDir)
# Start. Per hfile, move it, and insert an entry in catalog table.
families = fs.listStatus(outputdir, OutputLogFilter.new())
throw IOError.new("Can do one family only") if families.length > 1
# Read meta on all files. Put in map keyed by end key.
map = TreeMap.new(Bytes::ByteArrayComparator.new())
family = families[0]
# Make sure this subdir exists under table
hfiles = fs.listStatus(family.getPath())
LOG.info("Found " + hfiles.length.to_s + " hfiles");
count = 0
for hfile in hfiles
reader = HFile::Reader.new(fs, hfile.getPath(), nil, false)
begin
fileinfo = reader.loadFileInfo()
lastkey = reader.getLastKey()
# Last key is row/column/ts. We just want the row part.
rowlen = Bytes.toShort(lastkey)
LOG.info(count.to_s + " read lastrow of " +
Bytes.toString(lastkey[2, rowlen]) + " from " + hfile.getPath().toString())
map.put(lastkey[2, rowlen], [hfile, fileinfo])
count = count + 1
ensure
reader.close()
end
end
# Now I have sorted list of fileinfo+paths. Start insert.
# Get a client on catalog table.
meta = HTable.new(c, HConstants::META_TABLE_NAME)
# I can't find out from hfile how its compressed.
# Using all defaults. Change manually after loading if
# something else wanted in column or table attributes.
familyName = family.getPath().getName()
hcd = HColumnDescriptor.new(familyName)
htd = HTableDescriptor.new(tableName)
htd.addFamily(hcd)
previouslastkey = HConstants::EMPTY_START_ROW
count = 0
for i in map.keySet()
tuple = map.get(i)
startkey = previouslastkey
count = 1 + count
lastkey = i
if count == map.size()
# Then we are at last key. Set it to special indicator
lastkey = HConstants::EMPTY_START_ROW
end
previouslastkey = lastkey
hri = HRegionInfo.new(htd, startkey, lastkey)
LOG.info(hri.toString())
hfile = tuple[0].getPath()
rdir = Path.new(Path.new(tableDir, hri.getEncodedName().to_s), familyName)
fs.mkdirs(rdir)
tgt = Path.new(rdir, hfile.getName())
fs.rename(hfile, tgt)
LOG.info("Moved " + hfile.toString() + " to " + tgt.toString())
p = Put.new(hri.getRegionName())
p.add(HConstants::CATALOG_FAMILY, HConstants::REGIONINFO_QUALIFIER, Writables.getBytes(hri))
meta.put(p)
LOG.info("Inserted " + hri.toString())
end

View File

@ -0,0 +1,134 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.mortbay.log.Log;
/**
* Writes HFiles. Passed KeyValues must arrive in order.
* Currently, can only write files to a single column family at a
* time. Multiple column families requires coordinating keys cross family.
* Writes current time as the sequence id for the file. Sets the major compacted
* attribute on created hfiles.
* @see KeyValueSortReducer
*/
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
// Get the path of the temporary output file
final Path outputdir = FileOutputFormat.getOutputPath(context);
Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf);
// These configs. are from hbase-*.xml
final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536);
// Invented config. Add to hbase-*.xml if other than default compression.
final String compression = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
// Map of families to writers and how much has been output on the writer.
private final Map<byte [], WriterLength> writers =
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
public void write(ImmutableBytesWritable row, KeyValue kv)
throws IOException {
long length = kv.getLength();
byte [] family = kv.getFamily();
WriterLength wl = this.writers.get(family);
if (wl == null || ((length + wl.written) >= maxsize) &&
Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
// Get a new writer.
Path basedir = new Path(outputdir, Bytes.toString(family));
if (wl == null) {
wl = new WriterLength();
this.writers.put(family, wl);
if (this.writers.size() > 1) throw new IOException("One family only");
// If wl == null, first file in family. Ensure family dir exits.
if (!fs.exists(basedir)) fs.mkdirs(basedir);
}
wl.writer = getNewWriter(wl.writer, basedir);
Log.info("Writer=" + wl.writer.getPath() +
((wl.written == 0)? "": ", wrote=" + wl.written));
wl.written = 0;
}
wl.writer.append(kv);
wl.written += length;
// Copy the row so we know when a row transition.
this.previousRow = kv.getRow();
}
/* Create a new HFile.Writer. Close current if there is one.
* @param writer
* @param familydir
* @return A new HFile.Writer.
* @throws IOException
*/
private HFile.Writer getNewWriter(final HFile.Writer writer,
final Path familydir)
throws IOException {
close(writer);
return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir),
blocksize, compression, KeyValue.KEY_COMPARATOR);
}
private void close(final HFile.Writer w) throws IOException {
if (w != null) {
StoreFile.appendMetadata(w, System.currentTimeMillis(), true);
w.close();
}
}
public void close(TaskAttemptContext c)
throws IOException, InterruptedException {
for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
close(e.getValue().writer);
}
}
};
}
/*
* Data structure to hold a Writer and amount of data written on it.
*/
static class WriterLength {
long written = 0;
HFile.Writer writer = null;
}
}

View File

@ -0,0 +1,50 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.util.TreeSet;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Emits sorted KeyValues.
* Reads in all KeyValues from passed Iterator, sorts them, then emits
* KeyValues in sorted order. If lots of columns per row, it will use lots of
* memory sorting.
* @see HFileOutputFormat
*/
public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
for (KeyValue kv: kvs) {
map.add(kv);
}
context.setStatus("Read " + map.getClass());
int index = 0;
for (KeyValue kv: map) {
context.write(row, kv);
if (index > 0 && index % 100 == 0) context.setStatus("Wrote " + index);
}
}
}

View File

@ -25,6 +25,7 @@ Input/OutputFormats, a table indexing MapReduce job, and utility
<ul>
<li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
<li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
<li><a href="#bulk">Bulk Import writing HFiles directly</a></li>
<li><a href="#examples">Example Code</a></li>
</ul>
@ -82,7 +83,7 @@ specify source/sink table and column names in your configuration.</p>
<p>Reading from hbase, the TableInputFormat asks hbase for the list of
regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
whichever is smaller (If your job only has two maps, up mapred.map.tasks
to a number > number of regions). Maps will run on the adjacent TaskTracker
to a number &gt; number of regions). Maps will run on the adjacent TaskTracker
if you are running a TaskTracer and RegionServer per node.
Writing, it may make sense to avoid the reduce step and write yourself back into
hbase from inside your map. You'd do this when your job does not need the sort
@ -103,6 +104,48 @@ alter the number of existing regions when done; otherwise use the default
partitioner.
</p>
<h2><a name="bulk">Bulk import writing HFiles directly</a></h2>
<p>If importing into a new table, its possible to by-pass the HBase API
and write your content directly to the filesystem properly formatted as
HBase data files (HFiles). Your import will run faster, perhaps as much
as an order of magnitude faster if not more.
</p>
<p>You will need to write a MapReduce job. The map task will know how to
pull from your data source. Your reduce task will need to be hooked up to
{@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}. It expects to receive a row id and a value.
The row id must be formatted as a {@link org.apache.hadoop.hbase.io.ImmutableBytesWritable} and the
value as a {@link org.apache.hadoop.hbase.KeyValue} (A KeyValue holds he value for a cell and
its coordinates; row/family/qualifier/timestamp, etc.). Your reduce task
will also need to emit the KeyValues in order. See {@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer}
for an example reducer that emits KeyValues in order.
</p>
<p>Most importantly, you will also need to ensure that your MapReduce job
ensures a total ordering among all keys. MapReduce by default distributes
keys among reducers using a Partitioner that hashes on the map task output
key: i.e. the reducer a key ends up in is by default determined as follows
<code> (key.hashCode() &amp; Integer.MAX_VALUE) % numReduceTasks</code>.
Keys are sorted by the MapReduce framework before they are passed to the reducer
BUT the sort is scoped to the particular reducer. Its not a global sort.
Given the default hash Partitioner, if the keys were 0-4 (inclusive), and you
had configured two reducers, reducer 0 would have get keys 0, 2 and 4 whereas
reducer 1 would get keys 1 and 3 (in order). For your bulk import to work,
the keys need to be orderd so reducer 0 gets keys 0-2 and reducer 1 gets keys
3-4 (See TotalOrderPartitioner up in hadoop for more on what this means).
To achieve total ordering, you will likely need to write a Partitioner
that is intimate with your tables key namespace and that knows how
to distribute keys among the reducers so a total order is maintained.
</p>
<p>See org.apache.hadoop.hbase.mapreduce.TestHFileOutputFormat for an example that puts together
{@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer} and {@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.</p>
<p>HFileOutputFormat writes HFiles. When your MapReduce file finishes, in your
output directory you will have many HFiles. Run the script <code>bin/loadtable.rb</code>
to move the files from the MapReduce output directory under hbase. See head of script
for how to run it. This script
also adds the new table data to the hbase catalog tables. When the script completes,
on the next run of the hbase metascanner -- it usually runs every minute -- your
new table should be visible and populated.</p>
<h2><a name="examples">Example Code</a></h2>
<h3>Sample Row Counter</h3>
<p>See {@link org.apache.hadoop.hbase.mapreduce.RowCounter}. This job uses

View File

@ -397,15 +397,16 @@ public class StoreFile implements HConstants {
/**
* @param fs
* @param p
* @param dir Directory to create file in.
* @return random filename inside passed <code>dir</code>
*/
static Path getUniqueFile(final FileSystem fs, final Path p)
public static Path getUniqueFile(final FileSystem fs, final Path dir)
throws IOException {
if (!fs.getFileStatus(p).isDir()) {
throw new IOException("Expecting a directory");
if (!fs.getFileStatus(dir).isDir()) {
throw new IOException("Expecting " + dir.toString() +
" to be a directory");
}
return fs.getFileStatus(p).isDir()? getRandomFilename(fs, p): p;
return fs.getFileStatus(dir).isDir()? getRandomFilename(fs, dir): dir;
}
/**

View File

@ -94,8 +94,8 @@ public class PerformanceEvaluation implements HConstants {
private static final int ONE_GB = 1024 * 1024 * 1000;
private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
static final byte [] FAMILY_NAME = Bytes.toBytes("info");
static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
public static final byte [] FAMILY_NAME = Bytes.toBytes("info");
public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
protected static final HTableDescriptor TABLE_DESCRIPTOR;
static {
@ -780,7 +780,7 @@ public class PerformanceEvaluation implements HConstants {
* consumes about 30% of CPU time.
* @return Generated random value to insert into a table cell.
*/
static byte[] generateValue(final Random r) {
public static byte[] generateValue(final Random r) {
byte [] b = new byte [ROW_LENGTH];
r.nextBytes(b);
return b;

View File

@ -0,0 +1,182 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
* Sets up and runs a mapreduce job that writes hfile output.
* Creates a few inner classes to implement splits and an inputformat that
* emits keys and values like those of {@link PerformanceEvaluation}. Makes
* as many splits as "mapred.map.tasks" maps.
*/
public class TestHFileOutputFormat extends HBaseTestCase {
private final static int ROWSPERSPLIT = 1024;
/*
* InputFormat that makes keys and values like those used in
* PerformanceEvaluation. Makes as many splits as there are configured
* maps ("mapred.map.tasks").
*/
static class PEInputFormat extends InputFormat<ImmutableBytesWritable, ImmutableBytesWritable> {
/* Split that holds nothing but split index.
*/
static class PEInputSplit extends InputSplit implements Writable {
private int index = -1;
PEInputSplit() {
super();
}
PEInputSplit(final int i) {
this.index = i;
}
int getIndex() {
return this.index;
}
public long getLength() throws IOException, InterruptedException {
return ROWSPERSPLIT;
}
public String [] getLocations() throws IOException, InterruptedException {
return new String [] {};
}
public void readFields(DataInput in) throws IOException {
this.index = in.readInt();
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.index);
}
}
public RecordReader<ImmutableBytesWritable, ImmutableBytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
final int startrow = ((PEInputSplit)split).getIndex() * ROWSPERSPLIT;
return new RecordReader<ImmutableBytesWritable, ImmutableBytesWritable>() {
// Starts at a particular row
private int counter = startrow;
private ImmutableBytesWritable key;
private ImmutableBytesWritable value;
private final Random random = new Random(System.currentTimeMillis());
public void close() throws IOException {
// Nothing to do.
}
public ImmutableBytesWritable getCurrentKey()
throws IOException, InterruptedException {
return this.key;
}
public ImmutableBytesWritable getCurrentValue()
throws IOException, InterruptedException {
return this.value;
}
public float getProgress() throws IOException, InterruptedException {
return (ROWSPERSPLIT - this.counter) / this.counter;
}
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
// Nothing to do.
}
public boolean nextKeyValue() throws IOException, InterruptedException {
if (this.counter - startrow > ROWSPERSPLIT) return false;
this.counter++;
this.key = new ImmutableBytesWritable(PerformanceEvaluation.format(this.counter));
this.value = new ImmutableBytesWritable(PerformanceEvaluation.generateValue(this.random));
return true;
}
};
}
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
int count = context.getConfiguration().getInt("mapred.map.tasks", 1);
List<InputSplit> splits = new ArrayList<InputSplit>(count);
for (int i = 0; i < count; i++) {
splits.add(new PEInputSplit(i));
}
return splits;
}
}
/**
* Simple mapper that makes KeyValue output.
*/
static class PEtoKVMapper extends Mapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value,
org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,KeyValue>.Context context)
throws java.io.IOException ,InterruptedException {
context.write(key, new KeyValue(key.get(), PerformanceEvaluation.FAMILY_NAME,
PerformanceEvaluation.QUALIFIER_NAME, value.get()));
}
}
/**
* Run small MR job.
*/
public void testWritingPEData() throws Exception {
// Set down this value or we OOME in eclipse.
this.conf.setInt("io.sort.mb", 20);
// Write a few files.
this.conf.setLong("hbase.hregion.max.filesize", 64 * 1024);
Job job = new Job(this.conf, getName());
job.setInputFormatClass(TestHFileOutputFormat.PEInputFormat.class);
job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setReducerClass(KeyValueSortReducer.class);
job.setOutputFormatClass(HFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, this.testDir);
assertTrue(job.waitForCompletion(false));
FileStatus [] files = this.fs.listStatus(this.testDir);
assertTrue(files.length > 0);
}
}