HBASE-8768 Improve bulk load performance by moving key value construction from map phase to reduce phase (Rajeshbabu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1508941 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b4a120164a
commit
505b4e9ffd
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
@ -332,6 +333,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
||||||
job.setReducerClass(KeyValueSortReducer.class);
|
job.setReducerClass(KeyValueSortReducer.class);
|
||||||
} else if (Put.class.equals(job.getMapOutputValueClass())) {
|
} else if (Put.class.equals(job.getMapOutputValueClass())) {
|
||||||
job.setReducerClass(PutSortReducer.class);
|
job.setReducerClass(PutSortReducer.class);
|
||||||
|
} else if (Text.class.equals(job.getMapOutputValueClass())) {
|
||||||
|
job.setReducerClass(TextSortReducer.class);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
|
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.util.Base64;
|
import org.apache.hadoop.hbase.util.Base64;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||||
|
@ -247,6 +249,31 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length)
|
||||||
|
throws BadTsvLineException {
|
||||||
|
int rkColumnIndex = 0;
|
||||||
|
int startPos = 0, endPos = 0;
|
||||||
|
for (int i = 0; i <= length; i++) {
|
||||||
|
if (i == length || lineBytes[i] == separatorByte) {
|
||||||
|
endPos = i - 1;
|
||||||
|
if (rkColumnIndex++ == getRowKeyColumnIndex()) {
|
||||||
|
if ((endPos + 1) == startPos) {
|
||||||
|
throw new BadTsvLineException("Empty value for ROW KEY.");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
startPos = endPos + 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (i == length) {
|
||||||
|
throw new BadTsvLineException(
|
||||||
|
"Row key does not exist as number of columns in the line"
|
||||||
|
+ " are less than row key position.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new Pair<Integer, Integer>(startPos, endPos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -301,10 +328,22 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
Path outputDir = new Path(hfileOutPath);
|
Path outputDir = new Path(hfileOutPath);
|
||||||
FileOutputFormat.setOutputPath(job, outputDir);
|
FileOutputFormat.setOutputPath(job, outputDir);
|
||||||
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||||
job.setMapOutputValueClass(Put.class);
|
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
||||||
job.setCombinerClass(PutCombiner.class);
|
job.setMapOutputValueClass(Text.class);
|
||||||
|
job.setReducerClass(TextSortReducer.class);
|
||||||
|
} else {
|
||||||
|
job.setMapOutputValueClass(Put.class);
|
||||||
|
job.setCombinerClass(PutCombiner.class);
|
||||||
|
}
|
||||||
HFileOutputFormat.configureIncrementalLoad(job, table);
|
HFileOutputFormat.configureIncrementalLoad(job, table);
|
||||||
} else {
|
} else {
|
||||||
|
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
||||||
|
usage(TsvImporterTextMapper.class.toString()
|
||||||
|
+ " should not be used for non bulkloading case. use "
|
||||||
|
+ TsvImporterMapper.class.toString()
|
||||||
|
+ " or custom mapper whose value type is Put.");
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
// No reducers. Just write straight to table. Call initTableReducerJob
|
// No reducers. Just write straight to table. Call initTableReducerJob
|
||||||
// to set up the TableOutputFormat.
|
// to set up the TableOutputFormat.
|
||||||
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
|
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
|
||||||
|
|
|
@ -0,0 +1,188 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Base64;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emits Sorted KeyValues. Reads the text passed, parses it and creates the Key Values then Sorts
|
||||||
|
* them and emits Keyalues in sorted order.
|
||||||
|
* @see HFileOutputFormat
|
||||||
|
* @see KeyValueSortReducer
|
||||||
|
* @see PutSortReducer
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class TextSortReducer extends
|
||||||
|
Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
|
||||||
|
|
||||||
|
/** Timestamp for all inserted rows */
|
||||||
|
private long ts;
|
||||||
|
|
||||||
|
/** Column seperator */
|
||||||
|
private String separator;
|
||||||
|
|
||||||
|
/** Should skip bad lines */
|
||||||
|
private boolean skipBadLines;
|
||||||
|
|
||||||
|
private Counter badLineCount;
|
||||||
|
|
||||||
|
private ImportTsv.TsvParser parser;
|
||||||
|
|
||||||
|
public long getTs() {
|
||||||
|
return ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getSkipBadLines() {
|
||||||
|
return skipBadLines;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Counter getBadLineCount() {
|
||||||
|
return badLineCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrementBadLineCount(int count) {
|
||||||
|
this.badLineCount.increment(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles initializing this class with objects specific to it (i.e., the parser).
|
||||||
|
* Common initialization that might be leveraged by a subsclass is done in
|
||||||
|
* <code>doSetup</code>. Hence a subclass may choose to override this method
|
||||||
|
* and call <code>doSetup</code> as well before handling it's own custom params.
|
||||||
|
*
|
||||||
|
* @param context
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void setup(Context context) {
|
||||||
|
doSetup(context);
|
||||||
|
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
|
||||||
|
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
|
||||||
|
if (parser.getRowKeyColumnIndex() == -1) {
|
||||||
|
throw new RuntimeException("No row key column specified");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles common parameter initialization that a subclass might want to leverage.
|
||||||
|
* @param context
|
||||||
|
*/
|
||||||
|
protected void doSetup(Context context) {
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
|
||||||
|
// If a custom separator has been used,
|
||||||
|
// decode it back from Base64 encoding.
|
||||||
|
separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
|
||||||
|
if (separator == null) {
|
||||||
|
separator = ImportTsv.DEFAULT_SEPARATOR;
|
||||||
|
} else {
|
||||||
|
separator = new String(Base64.decode(separator));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should never get 0 as we are setting this to a valid value in job configuration.
|
||||||
|
ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
|
||||||
|
|
||||||
|
skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
|
||||||
|
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void reduce(
|
||||||
|
ImmutableBytesWritable rowKey,
|
||||||
|
java.lang.Iterable<Text> lines,
|
||||||
|
Reducer<ImmutableBytesWritable, Text,
|
||||||
|
ImmutableBytesWritable, KeyValue>.Context context)
|
||||||
|
throws java.io.IOException, InterruptedException
|
||||||
|
{
|
||||||
|
// although reduce() is called per-row, handle pathological case
|
||||||
|
long threshold = context.getConfiguration().getLong(
|
||||||
|
"reducer.row.threshold", 1L * (1<<30));
|
||||||
|
Iterator<Text> iter = lines.iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Set<KeyValue> kvs = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
|
||||||
|
long curSize = 0;
|
||||||
|
// stop at the end or the RAM threshold
|
||||||
|
while (iter.hasNext() && curSize < threshold) {
|
||||||
|
Text line = iter.next();
|
||||||
|
byte[] lineBytes = line.getBytes();
|
||||||
|
try {
|
||||||
|
ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
|
||||||
|
// Retrieve timestamp if exists
|
||||||
|
ts = parsed.getTimestamp(ts);
|
||||||
|
|
||||||
|
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
||||||
|
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
KeyValue kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(),
|
||||||
|
parsed.getRowKeyLength(), parser.getFamily(i), 0,
|
||||||
|
parser.getFamily(i).length, parser.getQualifier(i), 0,
|
||||||
|
parser.getQualifier(i).length, ts, KeyValue.Type.Put,
|
||||||
|
lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
|
||||||
|
kvs.add(kv);
|
||||||
|
curSize += kv.heapSize();
|
||||||
|
}
|
||||||
|
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
|
||||||
|
if (skipBadLines) {
|
||||||
|
System.err.println("Bad line." + badLine.getMessage());
|
||||||
|
incrementBadLineCount(1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new IOException(badLine);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
if (skipBadLines) {
|
||||||
|
System.err.println("Bad line." + e.getMessage());
|
||||||
|
incrementBadLineCount(1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
|
||||||
|
+ "(" + StringUtils.humanReadableInt(curSize) + ")");
|
||||||
|
int index = 0;
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
context.write(rowKey, kv);
|
||||||
|
if (++index > 0 && index % 100 == 0)
|
||||||
|
context.setStatus("Wrote " + index + " key values.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we have more entries to process
|
||||||
|
if (iter.hasNext()) {
|
||||||
|
// force flush because we cannot guarantee intra-row sorted order
|
||||||
|
context.write(null, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,133 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Base64;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write table content out to map output files.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class TsvImporterTextMapper
|
||||||
|
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
|
||||||
|
{
|
||||||
|
|
||||||
|
/** Column seperator */
|
||||||
|
private String separator;
|
||||||
|
|
||||||
|
/** Should skip bad lines */
|
||||||
|
private boolean skipBadLines;
|
||||||
|
private Counter badLineCount;
|
||||||
|
|
||||||
|
private ImportTsv.TsvParser parser;
|
||||||
|
|
||||||
|
public boolean getSkipBadLines() {
|
||||||
|
return skipBadLines;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Counter getBadLineCount() {
|
||||||
|
return badLineCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrementBadLineCount(int count) {
|
||||||
|
this.badLineCount.increment(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles initializing this class with objects specific to it (i.e., the parser).
|
||||||
|
* Common initialization that might be leveraged by a subsclass is done in
|
||||||
|
* <code>doSetup</code>. Hence a subclass may choose to override this method
|
||||||
|
* and call <code>doSetup</code> as well before handling it's own custom params.
|
||||||
|
*
|
||||||
|
* @param context
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void setup(Context context) {
|
||||||
|
doSetup(context);
|
||||||
|
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
|
||||||
|
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
|
||||||
|
if (parser.getRowKeyColumnIndex() == -1) {
|
||||||
|
throw new RuntimeException("No row key column specified");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles common parameter initialization that a subclass might want to leverage.
|
||||||
|
* @param context
|
||||||
|
*/
|
||||||
|
protected void doSetup(Context context) {
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
|
||||||
|
// If a custom separator has been used,
|
||||||
|
// decode it back from Base64 encoding.
|
||||||
|
separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
|
||||||
|
if (separator == null) {
|
||||||
|
separator = ImportTsv.DEFAULT_SEPARATOR;
|
||||||
|
} else {
|
||||||
|
separator = new String(Base64.decode(separator));
|
||||||
|
}
|
||||||
|
|
||||||
|
skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
|
||||||
|
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a line of TSV text into an HBase table row.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void map(LongWritable offset, Text value, Context context) throws IOException {
|
||||||
|
try {
|
||||||
|
Pair<Integer,Integer> rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength());
|
||||||
|
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
|
||||||
|
value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
|
||||||
|
context.write(rowKey, value);
|
||||||
|
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
|
||||||
|
if (skipBadLines) {
|
||||||
|
System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
|
||||||
|
incrementBadLineCount(1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new IOException(badLine);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
if (skipBadLines) {
|
||||||
|
System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage());
|
||||||
|
incrementBadLineCount(1);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -47,7 +47,10 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
|
import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -181,6 +184,49 @@ public class TestImportTsv implements Configurable {
|
||||||
util.deleteTable(table);
|
util.deleteTable(table);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
|
||||||
|
String table = "test-" + UUID.randomUUID();
|
||||||
|
Path bulkOutputPath = new Path(util.getDataTestDir(table),"hfiles");
|
||||||
|
String INPUT_FILE = "InputFile1.csv";
|
||||||
|
// Prepare the arguments required for the test.
|
||||||
|
String[] args =
|
||||||
|
new String[] {
|
||||||
|
"-D" + ImportTsv.MAPPER_CONF_KEY
|
||||||
|
+ "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
|
||||||
|
"-D" + ImportTsv.COLUMNS_CONF_KEY
|
||||||
|
+ "=HBASE_ROW_KEY,FAM:A,FAM:B",
|
||||||
|
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
|
||||||
|
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table,
|
||||||
|
INPUT_FILE
|
||||||
|
};
|
||||||
|
GenericOptionsParser opts = new GenericOptionsParser(util.getConfiguration(), args);
|
||||||
|
args = opts.getRemainingArgs();
|
||||||
|
Job job = ImportTsv.createSubmittableJob(util.getConfiguration(), args);
|
||||||
|
assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
|
||||||
|
assertTrue(job.getReducerClass().equals(TextSortReducer.class));
|
||||||
|
assertTrue(job.getMapOutputValueClass().equals(Text.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
|
||||||
|
String table = "test-" + UUID.randomUUID();
|
||||||
|
String FAMILY = "FAM";
|
||||||
|
Path bulkOutputPath = new Path(util.getDataTestDir(table),"hfiles");
|
||||||
|
// Prepare the arguments required for the test.
|
||||||
|
String[] args =
|
||||||
|
new String[] {
|
||||||
|
"-D" + ImportTsv.MAPPER_CONF_KEY
|
||||||
|
+ "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
|
||||||
|
"-D" + ImportTsv.COLUMNS_CONF_KEY
|
||||||
|
+ "=HBASE_ROW_KEY,FAM:A,FAM:B",
|
||||||
|
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
|
||||||
|
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table
|
||||||
|
};
|
||||||
|
String data = "KEY\u001bVALUE4\u001bVALUE8\n";
|
||||||
|
doMROnTableTest(util, FAMILY, data, args, 4);
|
||||||
|
}
|
||||||
|
|
||||||
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
|
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
|
||||||
String data, String[] args) throws Exception {
|
String data, String[] args) throws Exception {
|
||||||
return doMROnTableTest(util, family, data, args, 1);
|
return doMROnTableTest(util, family, data, args, 1);
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -186,4 +187,42 @@ public class TestImportTsvParser {
|
||||||
byte[] line = Bytes.toBytes("rowkey\tval_a");
|
byte[] line = Bytes.toBytes("rowkey\tval_a");
|
||||||
parser.parse(line, line.length);
|
parser.parse(line, line.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTsvParserParseRowKey() throws BadTsvLineException {
|
||||||
|
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
|
||||||
|
assertEquals(0, parser.getRowKeyColumnIndex());
|
||||||
|
byte[] line = Bytes.toBytes("rowkey\tval_a\t1234");
|
||||||
|
Pair<Integer, Integer> rowKeyOffsets = parser
|
||||||
|
.parseRowKey(line, line.length);
|
||||||
|
assertEquals(0, rowKeyOffsets.getFirst().intValue());
|
||||||
|
assertEquals(5, rowKeyOffsets.getSecond().intValue());
|
||||||
|
try {
|
||||||
|
line = Bytes.toBytes("\t\tval_a\t1234");
|
||||||
|
parser.parseRowKey(line, line.length);
|
||||||
|
fail("Should get BadTsvLineException on empty rowkey.");
|
||||||
|
} catch (BadTsvLineException b) {
|
||||||
|
|
||||||
|
}
|
||||||
|
parser = new TsvParser("col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t");
|
||||||
|
assertEquals(1, parser.getRowKeyColumnIndex());
|
||||||
|
line = Bytes.toBytes("val_a\trowkey\t1234");
|
||||||
|
rowKeyOffsets = parser.parseRowKey(line, line.length);
|
||||||
|
assertEquals(6, rowKeyOffsets.getFirst().intValue());
|
||||||
|
assertEquals(11, rowKeyOffsets.getSecond().intValue());
|
||||||
|
try {
|
||||||
|
line = Bytes.toBytes("val_a");
|
||||||
|
rowKeyOffsets = parser.parseRowKey(line, line.length);
|
||||||
|
fail("Should get BadTsvLineException when number of columns less than rowkey position.");
|
||||||
|
} catch (BadTsvLineException b) {
|
||||||
|
|
||||||
|
}
|
||||||
|
parser = new TsvParser("col_a,HBASE_TS_KEY,HBASE_ROW_KEY", "\t");
|
||||||
|
assertEquals(2, parser.getRowKeyColumnIndex());
|
||||||
|
line = Bytes.toBytes("val_a\t1234\trowkey");
|
||||||
|
rowKeyOffsets = parser.parseRowKey(line, line.length);
|
||||||
|
assertEquals(11, rowKeyOffsets.getFirst().intValue());
|
||||||
|
assertEquals(16, rowKeyOffsets.getSecond().intValue());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue