diff --git a/CHANGES.txt b/CHANGES.txt index b0cceee4b46..cde4eb85f31 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -147,6 +147,8 @@ Release 0.90.2 - Unreleased HBASE-3542 MultiGet methods in Thrift HBASE-3285 Hlog recovery takes too much time HBASE-3586 Improve the selection of regions to balance + HBASE-3623 Allow non-XML representable separator characters in the ImportTSV tool + (Harsh J Chouraria via Stack) Release 0.90.1 - Unreleased diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index e28e06f1ad3..b7b8001e8ab 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.mapreduce; +import org.apache.hadoop.hbase.util.Base64; + import java.io.IOException; import java.util.ArrayList; @@ -203,8 +205,18 @@ public class ImportTsv { @Override protected void setup(Context context) { Configuration conf = context.getConfiguration(); + + // If a custom separator has been used, + // decode it back from Base64 encoding. + String separator = conf.get(SEPARATOR_CONF_KEY); + if (separator == null) { + separator = DEFAULT_SEPARATOR; + } else { + separator = new String(Base64.decode(separator)); + } + parser = new TsvParser(conf.get(COLUMNS_CONF_KEY), - conf.get(SEPARATOR_CONF_KEY, DEFAULT_SEPARATOR)); + separator); if (parser.getRowKeyColumnIndex() == -1) { throw new RuntimeException("No row key column specified"); } @@ -271,6 +283,15 @@ public class ImportTsv { */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { + + // Support non-XML supported characters + // by re-encoding the passed separator as a Base64 string. + String actualSeparator = conf.get(SEPARATOR_CONF_KEY); + if (actualSeparator != null) { + conf.set(SEPARATOR_CONF_KEY, new String( + Base64.encodeBytes(actualSeparator.getBytes()))); + } + String tableName = args[0]; Path inputDir = new Path(args[1]); Job job = new Job(conf, NAME + "_" + tableName); diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index c6f3603e4ca..274c91dbff8 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -19,13 +19,33 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.UnsupportedEncodingException; +import java.util.List; import java.util.ArrayList; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.GenericOptionsParser; + import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Result; + import org.junit.Test; import com.google.common.base.Joiner; @@ -35,6 +55,7 @@ import com.google.common.collect.Iterables; import static org.junit.Assert.*; public class TestImportTsv { + @Test public void testTsvParserSpecParsing() { TsvParser parser; @@ -125,4 +146,94 @@ public class TestImportTsv { byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); ParsedLine parsed = parser.parse(line, line.length); } + + @Test + public void testMROnTable() + throws Exception { + String TABLE_NAME = "TestTable"; + String FAMILY = "FAM"; + String INPUT_FILE = "InputFile.esv"; + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + TABLE_NAME, + INPUT_FILE + }; + + // Cluster + HBaseTestingUtility htu1 = new HBaseTestingUtility(); + + MiniHBaseCluster cluster = htu1.startMiniCluster(); + + GenericOptionsParser opts = new GenericOptionsParser(cluster.getConfiguration(), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + + try { + + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true); + String line = "KEY\u001bVALUE1\u001bVALUE2\n"; + op.write(line.getBytes(HConstants.UTF8_ENCODING)); + op.close(); + + final byte[] FAM = Bytes.toBytes(FAMILY); + final byte[] TAB = Bytes.toBytes(TABLE_NAME); + final byte[] QA = Bytes.toBytes("A"); + final byte[] QB = Bytes.toBytes("B"); + + HTableDescriptor desc = new HTableDescriptor(TAB); + desc.addFamily(new HColumnDescriptor(FAM)); + new HBaseAdmin(conf).createTable(desc); + + Job job = ImportTsv.createSubmittableJob(conf, args); + job.waitForCompletion(false); + assertTrue(job.isSuccessful()); + + HTable table = new HTable(new Configuration(conf), TAB); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt("hbase.client.retries.number", 5); + for (int i = 0; i < numRetries; i++) { + try { + Scan scan = new Scan(); + // Scan entire family. + scan.addFamily(FAM); + ResultScanner resScanner = table.getScanner(scan); + for (Result res : resScanner) { + assertTrue(res.size() == 2); + List kvs = res.list(); + assertEquals(toU8Str(kvs.get(0).getRow()), + toU8Str(Bytes.toBytes("KEY"))); + assertEquals(toU8Str(kvs.get(1).getRow()), + toU8Str(Bytes.toBytes("KEY"))); + assertEquals(toU8Str(kvs.get(0).getValue()), + toU8Str(Bytes.toBytes("VALUE1"))); + assertEquals(toU8Str(kvs.get(1).getValue()), + toU8Str(Bytes.toBytes("VALUE2"))); + // Only one result set is expected, so let it loop. + } + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(verified); + } finally { + cluster.shutdown(); + } + } + + public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException { + return new String(bytes, HConstants.UTF8_ENCODING); + } }