HBASE-3623 Allow non-XML representable separator characters in the ImportTSV tool
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1080449 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d86e7e1c18
commit
897890bca1
|
@ -147,6 +147,8 @@ Release 0.90.2 - Unreleased
|
||||||
HBASE-3542 MultiGet methods in Thrift
|
HBASE-3542 MultiGet methods in Thrift
|
||||||
HBASE-3285 Hlog recovery takes too much time
|
HBASE-3285 Hlog recovery takes too much time
|
||||||
HBASE-3586 Improve the selection of regions to balance
|
HBASE-3586 Improve the selection of regions to balance
|
||||||
|
HBASE-3623 Allow non-XML representable separator characters in the ImportTSV tool
|
||||||
|
(Harsh J Chouraria via Stack)
|
||||||
|
|
||||||
Release 0.90.1 - Unreleased
|
Release 0.90.1 - Unreleased
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Base64;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
@ -203,8 +205,18 @@ public class ImportTsv {
|
||||||
@Override
|
@Override
|
||||||
protected void setup(Context context) {
|
protected void setup(Context context) {
|
||||||
Configuration conf = context.getConfiguration();
|
Configuration conf = context.getConfiguration();
|
||||||
|
|
||||||
|
// If a custom separator has been used,
|
||||||
|
// decode it back from Base64 encoding.
|
||||||
|
String separator = conf.get(SEPARATOR_CONF_KEY);
|
||||||
|
if (separator == null) {
|
||||||
|
separator = DEFAULT_SEPARATOR;
|
||||||
|
} else {
|
||||||
|
separator = new String(Base64.decode(separator));
|
||||||
|
}
|
||||||
|
|
||||||
parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
|
parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
|
||||||
conf.get(SEPARATOR_CONF_KEY, DEFAULT_SEPARATOR));
|
separator);
|
||||||
if (parser.getRowKeyColumnIndex() == -1) {
|
if (parser.getRowKeyColumnIndex() == -1) {
|
||||||
throw new RuntimeException("No row key column specified");
|
throw new RuntimeException("No row key column specified");
|
||||||
}
|
}
|
||||||
|
@ -271,6 +283,15 @@ public class ImportTsv {
|
||||||
*/
|
*/
|
||||||
public static Job createSubmittableJob(Configuration conf, String[] args)
|
public static Job createSubmittableJob(Configuration conf, String[] args)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
// Support non-XML supported characters
|
||||||
|
// by re-encoding the passed separator as a Base64 string.
|
||||||
|
String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
|
||||||
|
if (actualSeparator != null) {
|
||||||
|
conf.set(SEPARATOR_CONF_KEY, new String(
|
||||||
|
Base64.encodeBytes(actualSeparator.getBytes())));
|
||||||
|
}
|
||||||
|
|
||||||
String tableName = args[0];
|
String tableName = args[0];
|
||||||
Path inputDir = new Path(args[1]);
|
Path inputDir = new Path(args[1]);
|
||||||
Job job = new Job(conf, NAME + "_" + tableName);
|
Job job = new Job(conf, NAME + "_" + tableName);
|
||||||
|
|
|
@ -19,13 +19,33 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.util.List;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
@ -35,6 +55,7 @@ import com.google.common.collect.Iterables;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class TestImportTsv {
|
public class TestImportTsv {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTsvParserSpecParsing() {
|
public void testTsvParserSpecParsing() {
|
||||||
TsvParser parser;
|
TsvParser parser;
|
||||||
|
@ -125,4 +146,94 @@ public class TestImportTsv {
|
||||||
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
|
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
|
||||||
ParsedLine parsed = parser.parse(line, line.length);
|
ParsedLine parsed = parser.parse(line, line.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMROnTable()
|
||||||
|
throws Exception {
|
||||||
|
String TABLE_NAME = "TestTable";
|
||||||
|
String FAMILY = "FAM";
|
||||||
|
String INPUT_FILE = "InputFile.esv";
|
||||||
|
|
||||||
|
// Prepare the arguments required for the test.
|
||||||
|
String[] args = new String[] {
|
||||||
|
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
|
||||||
|
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
|
||||||
|
TABLE_NAME,
|
||||||
|
INPUT_FILE
|
||||||
|
};
|
||||||
|
|
||||||
|
// Cluster
|
||||||
|
HBaseTestingUtility htu1 = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
MiniHBaseCluster cluster = htu1.startMiniCluster();
|
||||||
|
|
||||||
|
GenericOptionsParser opts = new GenericOptionsParser(cluster.getConfiguration(), args);
|
||||||
|
Configuration conf = opts.getConfiguration();
|
||||||
|
args = opts.getRemainingArgs();
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
|
||||||
|
String line = "KEY\u001bVALUE1\u001bVALUE2\n";
|
||||||
|
op.write(line.getBytes(HConstants.UTF8_ENCODING));
|
||||||
|
op.close();
|
||||||
|
|
||||||
|
final byte[] FAM = Bytes.toBytes(FAMILY);
|
||||||
|
final byte[] TAB = Bytes.toBytes(TABLE_NAME);
|
||||||
|
final byte[] QA = Bytes.toBytes("A");
|
||||||
|
final byte[] QB = Bytes.toBytes("B");
|
||||||
|
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(TAB);
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAM));
|
||||||
|
new HBaseAdmin(conf).createTable(desc);
|
||||||
|
|
||||||
|
Job job = ImportTsv.createSubmittableJob(conf, args);
|
||||||
|
job.waitForCompletion(false);
|
||||||
|
assertTrue(job.isSuccessful());
|
||||||
|
|
||||||
|
HTable table = new HTable(new Configuration(conf), TAB);
|
||||||
|
boolean verified = false;
|
||||||
|
long pause = conf.getLong("hbase.client.pause", 5 * 1000);
|
||||||
|
int numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||||
|
for (int i = 0; i < numRetries; i++) {
|
||||||
|
try {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
// Scan entire family.
|
||||||
|
scan.addFamily(FAM);
|
||||||
|
ResultScanner resScanner = table.getScanner(scan);
|
||||||
|
for (Result res : resScanner) {
|
||||||
|
assertTrue(res.size() == 2);
|
||||||
|
List<KeyValue> kvs = res.list();
|
||||||
|
assertEquals(toU8Str(kvs.get(0).getRow()),
|
||||||
|
toU8Str(Bytes.toBytes("KEY")));
|
||||||
|
assertEquals(toU8Str(kvs.get(1).getRow()),
|
||||||
|
toU8Str(Bytes.toBytes("KEY")));
|
||||||
|
assertEquals(toU8Str(kvs.get(0).getValue()),
|
||||||
|
toU8Str(Bytes.toBytes("VALUE1")));
|
||||||
|
assertEquals(toU8Str(kvs.get(1).getValue()),
|
||||||
|
toU8Str(Bytes.toBytes("VALUE2")));
|
||||||
|
// Only one result set is expected, so let it loop.
|
||||||
|
}
|
||||||
|
verified = true;
|
||||||
|
break;
|
||||||
|
} catch (NullPointerException e) {
|
||||||
|
// If here, a cell was empty. Presume its because updates came in
|
||||||
|
// after the scanner had been opened. Wait a while and retry.
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(pause);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(verified);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
|
||||||
|
return new String(bytes, HConstants.UTF8_ENCODING);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue