diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index d800072f82c..4cc997ac347 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -332,6 +333,8 @@ public class HFileOutputFormat extends FileOutputFormat parseRowKey(byte[] lineBytes, int length) + throws BadTsvLineException { + int rkColumnIndex = 0; + int startPos = 0, endPos = 0; + for (int i = 0; i <= length; i++) { + if (i == length || lineBytes[i] == separatorByte) { + endPos = i - 1; + if (rkColumnIndex++ == getRowKeyColumnIndex()) { + if ((endPos + 1) == startPos) { + throw new BadTsvLineException("Empty value for ROW KEY."); + } + break; + } else { + startPos = endPos + 2; + } + } + if (i == length) { + throw new BadTsvLineException( + "Row key does not exist as number of columns in the line" + + " are less than row key position."); + } + } + return new Pair(startPos, endPos); + } } /** @@ -301,10 +328,22 @@ public class ImportTsv extends Configured implements Tool { Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(Put.class); - job.setCombinerClass(PutCombiner.class); + if (mapperClass.equals(TsvImporterTextMapper.class)) { + job.setMapOutputValueClass(Text.class); + job.setReducerClass(TextSortReducer.class); + } else { + job.setMapOutputValueClass(Put.class); + job.setCombinerClass(PutCombiner.class); + } HFileOutputFormat.configureIncrementalLoad(job, table); } else { + if (mapperClass.equals(TsvImporterTextMapper.class)) { + usage(TsvImporterTextMapper.class.toString() + + " should not be used for non bulkloading case. use " + + TsvImporterMapper.class.toString() + + " or custom mapper whose value type is Put."); + System.exit(-1); + } // No reducers. Just write straight to table. Call initTableReducerJob // to set up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java new file mode 100644 index 00000000000..d50307928f0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.StringUtils; + +/** + * Emits Sorted KeyValues. Reads the text passed, parses it and creates the Key Values then Sorts + * them and emits Keyalues in sorted order. + * @see HFileOutputFormat + * @see KeyValueSortReducer + * @see PutSortReducer + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class TextSortReducer extends + Reducer { + + /** Timestamp for all inserted rows */ + private long ts; + + /** Column seperator */ + private String separator; + + /** Should skip bad lines */ + private boolean skipBadLines; + + private Counter badLineCount; + + private ImportTsv.TsvParser parser; + + public long getTs() { + return ts; + } + + public boolean getSkipBadLines() { + return skipBadLines; + } + + public Counter getBadLineCount() { + return badLineCount; + } + + public void incrementBadLineCount(int count) { + this.badLineCount.increment(count); + } + + /** + * Handles initializing this class with objects specific to it (i.e., the parser). + * Common initialization that might be leveraged by a subsclass is done in + * doSetup. Hence a subclass may choose to override this method + * and call doSetup as well before handling it's own custom params. + * + * @param context + */ + @Override + protected void setup(Context context) { + doSetup(context); + + Configuration conf = context.getConfiguration(); + + parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); + if (parser.getRowKeyColumnIndex() == -1) { + throw new RuntimeException("No row key column specified"); + } + } + + /** + * Handles common parameter initialization that a subclass might want to leverage. + * @param context + */ + protected void doSetup(Context context) { + Configuration conf = context.getConfiguration(); + + // If a custom separator has been used, + // decode it back from Base64 encoding. + separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); + if (separator == null) { + separator = ImportTsv.DEFAULT_SEPARATOR; + } else { + separator = new String(Base64.decode(separator)); + } + + // Should never get 0 as we are setting this to a valid value in job configuration. + ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); + + skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); + badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + } + + @Override + protected void reduce( + ImmutableBytesWritable rowKey, + java.lang.Iterable lines, + Reducer.Context context) + throws java.io.IOException, InterruptedException + { + // although reduce() is called per-row, handle pathological case + long threshold = context.getConfiguration().getLong( + "reducer.row.threshold", 1L * (1<<30)); + Iterator iter = lines.iterator(); + while (iter.hasNext()) { + Set kvs = new TreeSet(KeyValue.COMPARATOR); + long curSize = 0; + // stop at the end or the RAM threshold + while (iter.hasNext() && curSize < threshold) { + Text line = iter.next(); + byte[] lineBytes = line.getBytes(); + try { + ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength()); + // Retrieve timestamp if exists + ts = parsed.getTimestamp(ts); + + for (int i = 0; i < parsed.getColumnCount(); i++) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()) { + continue; + } + KeyValue kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), + parsed.getRowKeyLength(), parser.getFamily(i), 0, + parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, + lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); + kvs.add(kv); + curSize += kv.heapSize(); + } + } catch (ImportTsv.TsvParser.BadTsvLineException badLine) { + if (skipBadLines) { + System.err.println("Bad line." + badLine.getMessage()); + incrementBadLineCount(1); + return; + } + throw new IOException(badLine); + } catch (IllegalArgumentException e) { + if (skipBadLines) { + System.err.println("Bad line." + e.getMessage()); + incrementBadLineCount(1); + return; + } + throw new IOException(e); + } + } + context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass() + + "(" + StringUtils.humanReadableInt(curSize) + ")"); + int index = 0; + for (KeyValue kv : kvs) { + context.write(rowKey, kv); + if (++index > 0 && index % 100 == 0) + context.setStatus("Wrote " + index + " key values."); + } + + // if we have more entries to process + if (iter.hasNext()) { + // force flush because we cannot guarantee intra-row sorted order + context.write(null, null); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java new file mode 100644 index 00000000000..bfebfb5fc61 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +/** + * Write table content out to map output files. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class TsvImporterTextMapper +extends Mapper +{ + + /** Column seperator */ + private String separator; + + /** Should skip bad lines */ + private boolean skipBadLines; + private Counter badLineCount; + + private ImportTsv.TsvParser parser; + + public boolean getSkipBadLines() { + return skipBadLines; + } + + public Counter getBadLineCount() { + return badLineCount; + } + + public void incrementBadLineCount(int count) { + this.badLineCount.increment(count); + } + + /** + * Handles initializing this class with objects specific to it (i.e., the parser). + * Common initialization that might be leveraged by a subsclass is done in + * doSetup. Hence a subclass may choose to override this method + * and call doSetup as well before handling it's own custom params. + * + * @param context + */ + @Override + protected void setup(Context context) { + doSetup(context); + + Configuration conf = context.getConfiguration(); + + parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); + if (parser.getRowKeyColumnIndex() == -1) { + throw new RuntimeException("No row key column specified"); + } + } + + /** + * Handles common parameter initialization that a subclass might want to leverage. + * @param context + */ + protected void doSetup(Context context) { + Configuration conf = context.getConfiguration(); + + // If a custom separator has been used, + // decode it back from Base64 encoding. + separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); + if (separator == null) { + separator = ImportTsv.DEFAULT_SEPARATOR; + } else { + separator = new String(Base64.decode(separator)); + } + + skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); + badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + } + + /** + * Convert a line of TSV text into an HBase table row. + */ + @Override + public void map(LongWritable offset, Text value, Context context) throws IOException { + try { + Pair rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength()); + ImmutableBytesWritable rowKey = new ImmutableBytesWritable( + value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond()); + context.write(rowKey, value); + } catch (ImportTsv.TsvParser.BadTsvLineException badLine) { + if (skipBadLines) { + System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); + incrementBadLineCount(1); + return; + } + throw new IOException(badLine); + } catch (IllegalArgumentException e) { + if (skipBadLines) { + System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage()); + incrementBadLineCount(1); + return; + } else { + throw new IOException(e); + } + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index b8591cf5514..2d93371fab0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -47,7 +47,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; @@ -181,6 +184,49 @@ public class TestImportTsv implements Configurable { util.deleteTable(table); } + @Test + public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { + String table = "test-" + UUID.randomUUID(); + Path bulkOutputPath = new Path(util.getDataTestDir(table),"hfiles"); + String INPUT_FILE = "InputFile1.csv"; + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table, + INPUT_FILE + }; + GenericOptionsParser opts = new GenericOptionsParser(util.getConfiguration(), args); + args = opts.getRemainingArgs(); + Job job = ImportTsv.createSubmittableJob(util.getConfiguration(), args); + assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); + assertTrue(job.getReducerClass().equals(TextSortReducer.class)); + assertTrue(job.getMapOutputValueClass().equals(Text.class)); + } + + @Test + public void testBulkOutputWithTsvImporterTextMapper() throws Exception { + String table = "test-" + UUID.randomUUID(); + String FAMILY = "FAM"; + Path bulkOutputPath = new Path(util.getDataTestDir(table),"hfiles"); + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table + }; + String data = "KEY\u001bVALUE4\u001bVALUE8\n"; + doMROnTableTest(util, FAMILY, data, args, 4); + } + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args) throws Exception { return doMROnTableTest(util, family, data, args, 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java index 971de462835..edc927bf6a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -186,4 +187,42 @@ public class TestImportTsvParser { byte[] line = Bytes.toBytes("rowkey\tval_a"); parser.parse(line, line.length); } + + @Test + public void testTsvParserParseRowKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); + assertEquals(0, parser.getRowKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a\t1234"); + Pair rowKeyOffsets = parser + .parseRowKey(line, line.length); + assertEquals(0, rowKeyOffsets.getFirst().intValue()); + assertEquals(5, rowKeyOffsets.getSecond().intValue()); + try { + line = Bytes.toBytes("\t\tval_a\t1234"); + parser.parseRowKey(line, line.length); + fail("Should get BadTsvLineException on empty rowkey."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t"); + assertEquals(1, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("val_a\trowkey\t1234"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + assertEquals(6, rowKeyOffsets.getFirst().intValue()); + assertEquals(11, rowKeyOffsets.getSecond().intValue()); + try { + line = Bytes.toBytes("val_a"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + fail("Should get BadTsvLineException when number of columns less than rowkey position."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("col_a,HBASE_TS_KEY,HBASE_ROW_KEY", "\t"); + assertEquals(2, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("val_a\t1234\trowkey"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + assertEquals(11, rowKeyOffsets.getFirst().intValue()); + assertEquals(16, rowKeyOffsets.getSecond().intValue()); + } + }