HBASE-3880 Make mapper function in ImportTSV plug-able

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1124542 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-05-19 05:48:37 +00:00
parent 31965fe0fb
commit e0ad6f0ed1
5 changed files with 265 additions and 127 deletions

View File

@ -219,6 +219,7 @@ Release 0.91.0 - Unreleased
HBASE-3797 StoreFile Level Compaction Locking
HBASE-1476 Multithreaded Compactions
HBASE-3877 Determine Proper Defaults for Compaction ThreadPools
HBASE-3880 Make mapper function in ImportTSV plug-able (Bill Graham)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -28,17 +28,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@ -59,12 +53,14 @@ import com.google.common.collect.Lists;
public class ImportTsv {
final static String NAME = "importtsv";
final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
final static String COLUMNS_CONF_KEY = "importtsv.columns";
final static String SEPARATOR_CONF_KEY = "importtsv.separator";
final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
final static String DEFAULT_SEPARATOR = "\t";
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
static class TsvParser {
/**
@ -76,7 +72,7 @@ public class ImportTsv {
private final byte separatorByte;
private int rowKeyColumnIndex;
public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
/**
@ -93,7 +89,7 @@ public class ImportTsv {
// Configure columns
ArrayList<String> columnStrings = Lists.newArrayList(
Splitter.on(',').trimResults().split(columnsSpecification));
families = new byte[columnStrings.size()][];
qualifiers = new byte[columnStrings.size()][];
@ -113,7 +109,7 @@ public class ImportTsv {
}
}
}
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
@ -123,7 +119,7 @@ public class ImportTsv {
public byte[] getQualifier(int idx) {
return qualifiers[idx];
}
public ParsedLine parse(byte[] lineBytes, int length)
throws BadTsvLineException {
// Enumerate separator offsets
@ -146,16 +142,16 @@ public class ImportTsv {
}
return new ParsedLine(tabOffsets, lineBytes);
}
class ParsedLine {
private final ArrayList<Integer> tabOffsets;
private byte[] lineBytes;
ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
this.tabOffsets = tabOffsets;
this.lineBytes = lineBytes;
}
public int getRowKeyOffset() {
return getColumnOffset(rowKeyColumnIndex);
}
@ -167,7 +163,7 @@ public class ImportTsv {
return tabOffsets.get(idx - 1) + 1;
else
return 0;
}
}
public int getColumnLength(int idx) {
return tabOffsets.get(idx) - getColumnOffset(idx);
}
@ -178,7 +174,7 @@ public class ImportTsv {
return lineBytes;
}
}
public static class BadTsvLineException extends Exception {
public BadTsvLineException(String err) {
super(err);
@ -186,103 +182,6 @@ public class ImportTsv {
private static final long serialVersionUID = 1L;
}
}
/**
* Write table content out to files in hdfs.
*/
static class TsvImporter
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
{
/** Timestamp for all inserted rows */
private long ts;
/** Should skip bad lines */
private boolean skipBadLines;
private Counter badLineCount;
private TsvParser parser;
@Override
protected void setup(Context context) {
Configuration conf = context.getConfiguration();
// If a custom separator has been used,
// decode it back from Base64 encoding.
String separator = conf.get(SEPARATOR_CONF_KEY);
if (separator == null) {
separator = DEFAULT_SEPARATOR;
} else {
separator = new String(Base64.decode(separator));
}
parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
separator);
if (parser.getRowKeyColumnIndex() == -1) {
throw new RuntimeException("No row key column specified");
}
ts = conf.getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
skipBadLines = context.getConfiguration().getBoolean(
SKIP_LINES_CONF_KEY, true);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
}
/**
* Convert a line of TSV text into an HBase table row.
*/
@Override
public void map(LongWritable offset, Text value,
Context context)
throws IOException {
byte[] lineBytes = value.getBytes();
try {
TsvParser.ParsedLine parsed = parser.parse(
lineBytes, value.getLength());
ImmutableBytesWritable rowKey =
new ImmutableBytesWritable(lineBytes,
parsed.getRowKeyOffset(),
parsed.getRowKeyLength());
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex()) continue;
KeyValue kv = new KeyValue(
lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length,
parser.getQualifier(i), 0, parser.getQualifier(i).length,
ts,
KeyValue.Type.Put,
lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
put.add(kv);
}
context.write(rowKey, put);
} catch (BadTsvLineException badLine) {
if (skipBadLines) {
System.err.println(
"Bad line at offset: " + offset.get() + ":\n" +
badLine.getMessage());
badLineCount.increment(1);
return;
} else {
throw new IOException(badLine);
}
} catch (IllegalArgumentException e) {
if (skipBadLines) {
System.err.println(
"Bad line at offset: " + offset.get() + ":\n" +
e.getMessage());
badLineCount.increment(1);
return;
} else {
throw new IOException(e);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* Sets up the actual job.
@ -293,7 +192,7 @@ public class ImportTsv {
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
throws IOException, ClassNotFoundException {
// Support non-XML supported characters
// by re-encoding the passed separator as a Base64 string.
@ -303,13 +202,18 @@ public class ImportTsv {
Base64.encodeBytes(actualSeparator.getBytes())));
}
// See if a non-default Mapper was set
String mapperClassName = conf.get(MAPPER_CONF_KEY);
Class mapperClass = mapperClassName != null ?
Class.forName(mapperClassName) : DEFAULT_MAPPER;
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(TsvImporter.class);
job.setJarByClass(mapperClass);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TsvImporter.class);
job.setMapperClass(mapperClass);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
@ -326,9 +230,9 @@ public class ImportTsv {
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
}
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Function.class /* Guava used by TsvParser */);
return job;
}
@ -358,7 +262,8 @@ public class ImportTsv {
"Other options that may be specified with -D include:\n" +
" -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
" '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
" -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n";
" -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
" -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n";
System.err.println(usage);
}

View File

@ -0,0 +1,148 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
/**
* Write table content out to files in hdfs.
*/
public class TsvImporterMapper
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
{
/** Timestamp for all inserted rows */
private long ts;
/** Column seperator */
private String separator;
/** Should skip bad lines */
private boolean skipBadLines;
private Counter badLineCount;
private ImportTsv.TsvParser parser;
public long getTs() {
return ts;
}
public boolean getSkipBadLines() {
return skipBadLines;
}
public Counter getBadLineCount() {
return badLineCount;
}
public void incrementBadLineCount(int count) {
this.badLineCount.increment(count);
}
/**
* Handles initializing this class with objects specific to it (i.e., the parser).
* Common initialization that might be leveraged by a subsclass is done in
* <code>doSetup</code>. Hence a subclass may choose to override this method
* and call <code>doSetup</code> as well before handling it's own custom params.
*
* @param context
*/
@Override
protected void setup(Context context) {
doSetup(context);
Configuration conf = context.getConfiguration();
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
separator);
if (parser.getRowKeyColumnIndex() == -1) {
throw new RuntimeException("No row key column specified");
}
}
/**
* Handles common parameter initialization that a subclass might want to leverage.
* @param context
*/
protected void doSetup(Context context) {
Configuration conf = context.getConfiguration();
// If a custom separator has been used,
// decode it back from Base64 encoding.
separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
if (separator == null) {
separator = ImportTsv.DEFAULT_SEPARATOR;
} else {
separator = new String(Base64.decode(separator));
}
ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, System.currentTimeMillis());
skipBadLines = context.getConfiguration().getBoolean(
ImportTsv.SKIP_LINES_CONF_KEY, true);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
}
/**
* Convert a line of TSV text into an HBase table row.
*/
@Override
public void map(LongWritable offset, Text value,
Context context)
throws IOException {
byte[] lineBytes = value.getBytes();
try {
ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
lineBytes, value.getLength());
ImmutableBytesWritable rowKey =
new ImmutableBytesWritable(lineBytes,
parsed.getRowKeyOffset(),
parsed.getRowKeyLength());
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex()) continue;
KeyValue kv = new KeyValue(
lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length,
parser.getQualifier(i), 0, parser.getQualifier(i).length,
ts,
KeyValue.Type.Put,
lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
put.add(kv);
}
context.write(rowKey, put);
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
if (skipBadLines) {
System.err.println(
"Bad line at offset: " + offset.get() + ":\n" +
badLine.getMessage());
incrementBadLineCount(1);
return;
} else {
throw new IOException(badLine);
}
} catch (IllegalArgumentException e) {
if (skipBadLines) {
System.err.println(
"Bad line at offset: " + offset.get() + ":\n" +
e.getMessage());
incrementBadLineCount(1);
return;
} else {
throw new IOException(e);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -107,11 +107,11 @@ public class TestImportTsv {
parsed.getColumnLength(i)));
}
if (!Iterables.elementsEqual(parsedCols, expected)) {
fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
"Got:" + Joiner.on(",").join(parsedCols));
}
}
private void assertBytesEquals(byte[] a, byte[] b) {
assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
}
@ -153,7 +153,7 @@ public class TestImportTsv {
String TABLE_NAME = "TestTable";
String FAMILY = "FAM";
String INPUT_FILE = "InputFile.esv";
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
@ -162,6 +162,29 @@ public class TestImportTsv {
INPUT_FILE
};
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1);
}
@Test
public void testMROnTableWithCustomMapper()
throws Exception {
String TABLE_NAME = "TestTable";
String FAMILY = "FAM";
String INPUT_FILE = "InputFile2.esv";
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
TABLE_NAME,
INPUT_FILE
};
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
}
private void doMROnTableTest(String inputFile, String family, String tableName,
String[] args, int valueMultiplier) throws Exception {
// Cluster
HBaseTestingUtility htu1 = new HBaseTestingUtility();
@ -172,15 +195,15 @@ public class TestImportTsv {
args = opts.getRemainingArgs();
try {
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
FSDataOutputStream op = fs.create(new Path(inputFile), true);
String line = "KEY\u001bVALUE1\u001bVALUE2\n";
op.write(line.getBytes(HConstants.UTF8_ENCODING));
op.close();
final byte[] FAM = Bytes.toBytes(FAMILY);
final byte[] TAB = Bytes.toBytes(TABLE_NAME);
final byte[] FAM = Bytes.toBytes(family);
final byte[] TAB = Bytes.toBytes(tableName);
final byte[] QA = Bytes.toBytes("A");
final byte[] QB = Bytes.toBytes("B");
@ -210,9 +233,9 @@ public class TestImportTsv {
assertEquals(toU8Str(kvs.get(1).getRow()),
toU8Str(Bytes.toBytes("KEY")));
assertEquals(toU8Str(kvs.get(0).getValue()),
toU8Str(Bytes.toBytes("VALUE1")));
toU8Str(Bytes.toBytes("VALUE" + valueMultiplier)));
assertEquals(toU8Str(kvs.get(1).getValue()),
toU8Str(Bytes.toBytes("VALUE2")));
toU8Str(Bytes.toBytes("VALUE" + 2*valueMultiplier)));
// Only one result set is expected, so let it loop.
}
verified = true;

View File

@ -0,0 +1,61 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
import java.io.IOException;
/**
* Dummy mapper used for unit tests to verify that the mapper can be injected.
* This approach would be used if a custom transformation needed to be done after
* reading the input data before writing it to HFiles.
*/
public class TsvImporterCustomTestMapper extends TsvImporterMapper {
@Override
protected void setup(Context context) {
doSetup(context);
}
/**
* Convert a line of TSV text into an HBase table row after transforming the
* values by multiplying them by 3.
*/
@Override
public void map(LongWritable offset, Text value, Context context)
throws IOException {
byte[] family = Bytes.toBytes("FAM");
final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
// do some basic line parsing
byte[] lineBytes = value.getBytes();
String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b");
// create the rowKey and Put
ImmutableBytesWritable rowKey =
new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
Put put = new Put(rowKey.copyBytes());
//The value should look like this: VALUE1 or VALUE2. Let's multiply
//the integer by 3
for(int i = 1; i < valueTokens.length; i++) {
String prefix = valueTokens[i].substring(0, "VALUE".length());
String suffix = valueTokens[i].substring("VALUE".length());
String newValue = prefix + Integer.parseInt(suffix) * 3;
KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
qualifiers[i-1], Bytes.toBytes(newValue));
put.add(kv);
}
try {
context.write(rowKey, put);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}