HBASE-7938 Add integration test for ImportTsv/LoadIncrementalHFiles workflow (Nick Dimiduk)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1456700 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-03-14 22:28:08 +00:00
parent 83ffbc8de1
commit 48debe39f0
4 changed files with 645 additions and 261 deletions

View File

@ -0,0 +1,191 @@
package org.apache.hadoop.hbase.mapreduce;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.IntegrationTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Validate ImportTsv + LoadIncrementalHFiles on a distributed cluster.
*/
@Category(IntegrationTests.class)
public class IntegrationTestImportTsv implements Configurable, Tool {
private static final String NAME = IntegrationTestImportTsv.class.getSimpleName();
protected static final Log LOG = LogFactory.getLog(IntegrationTestImportTsv.class);
protected static final String simple_tsv =
"row1\t1\tc1\tc2\n" +
"row2\t1\tc1\tc2\n" +
"row3\t1\tc1\tc2\n" +
"row4\t1\tc1\tc2\n" +
"row5\t1\tc1\tc2\n" +
"row6\t1\tc1\tc2\n" +
"row7\t1\tc1\tc2\n" +
"row8\t1\tc1\tc2\n" +
"row9\t1\tc1\tc2\n" +
"row10\t1\tc1\tc2\n";
protected static final Set<KeyValue> simple_expected =
new TreeSet<KeyValue>(KeyValue.COMPARATOR) {
private static final long serialVersionUID = 1L;
{
byte[] family = Bytes.toBytes("d");
for (String line : simple_tsv.split("\n")) {
String[] row = line.split("\t");
byte[] key = Bytes.toBytes(row[0]);
long ts = Long.parseLong(row[1]);
byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) };
add(new KeyValue(key, family, fields[0], ts, Type.Put, fields[0]));
add(new KeyValue(key, family, fields[1], ts, Type.Put, fields[1]));
}
}
};
// this instance is initialized on first access when the test is run from
// JUnit/Maven or by main when run from the CLI.
protected static IntegrationTestingUtility util = null;
public Configuration getConf() {
return util.getConfiguration();
}
public void setConf(Configuration conf) {
throw new IllegalArgumentException("setConf not supported");
}
@BeforeClass
public static void provisionCluster() throws Exception {
if (null == util) {
util = new IntegrationTestingUtility();
}
util.initializeCluster(1);
}
@AfterClass
public static void releaseCluster() throws Exception {
util.restoreCluster();
util = null;
}
/**
* Verify the data described by <code>simple_tsv</code> matches
* <code>simple_expected</code>.
*/
protected void doLoadIncrementalHFiles(Path hfiles, String tableName)
throws Exception {
String[] args = { hfiles.toString(), tableName };
LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args)));
assertEquals("Loading HFiles failed.",
0, ToolRunner.run(new LoadIncrementalHFiles(new Configuration(getConf())), args));
HTable table = null;
Scan scan = new Scan() {{
setCacheBlocks(false);
setCaching(1000);
}};
try {
table = new HTable(getConf(), tableName);
Iterator<Result> resultsIt = table.getScanner(scan).iterator();
Iterator<KeyValue> expectedIt = simple_expected.iterator();
while (resultsIt.hasNext() && expectedIt.hasNext()) {
Result r = resultsIt.next();
for (KeyValue actual : r.raw()) {
assertTrue(
"Ran out of expected values prematurely!",
expectedIt.hasNext());
KeyValue expected = expectedIt.next();
assertTrue(
format("Scan produced surprising result. expected: <%s>, actual: %s",
expected, actual),
KeyValue.COMPARATOR.compare(expected, actual) == 0);
}
}
assertFalse("Did not consume all expected values.", expectedIt.hasNext());
assertFalse("Did not consume all scan results.", resultsIt.hasNext());
} finally {
if (null != table) table.close();
}
}
@Test
public void testGenerateAndLoad() throws Exception {
String table = NAME + "-" + UUID.randomUUID();
String cf = "d";
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
String[] args = {
format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles),
format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2",
ImportTsv.COLUMNS_CONF_KEY, cf, cf),
// configure the test harness to NOT delete the HFiles after they're
// generated. We need those for doLoadIncrementalHFiles
format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF),
table
};
// run the job, complete the load.
util.createTable(table, cf);
TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
doLoadIncrementalHFiles(hfiles, table);
util.deleteTable(table);
util.cleanupDataTestDirOnTestFS(table);
}
public int run(String[] args) throws Exception {
if (args.length != 0) {
System.err.println(format("%s [genericOptions]", NAME));
System.err.println(" Runs ImportTsv integration tests against a distributed cluster.");
System.err.println();
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}
// adding more test methods? Don't forget to add them here... or consider doing what
// IntegrationTestsDriver does.
provisionCluster();
testGenerateAndLoad();
releaseCluster();
return 0;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
util = new IntegrationTestingUtility(conf);
// not using ToolRunner to avoid unnecessary call to setConf()
args = new GenericOptionsParser(conf, args).getRemainingArgs();
int status = new IntegrationTestImportTsv().run(args);
System.exit(status);
}
}

View File

@ -315,7 +315,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
/**
* @return Where to write test data on the test filesystem; Returns working directory
* for the test filesytem by default
* for the test filesystem by default
* @see #setupDataTestDirOnTestFS()
* @see #getTestFileSystem()
*/
@ -396,6 +396,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
FileSystem fs = getTestFileSystem();
if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
File dataTestDir = new File(getDataTestDir().toString());
dataTestDir.deleteOnExit();
dataTestDirOnTestFS = new Path(dataTestDir.getAbsolutePath());
} else {
Path base = getBaseTestDirOnTestFS();
@ -404,6 +405,29 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
fs.deleteOnExit(dataTestDirOnTestFS);
}
}
/**
* Cleans the test data directory on the test filesystem.
* @return True if we removed the test dirs
* @throws IOException
*/
public boolean cleanupDataTestDirOnTestFS() throws IOException {
boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
if (ret)
dataTestDirOnTestFS = null;
return ret;
}
/**
* Cleans a subdirectory under the test data directory on the test filesystem.
* @return True if we removed child
* @throws IOException
*/
public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
Path cpath = getDataTestDirOnTestFS(subdirName);
return getTestFileSystem().delete(cpath, true);
}
/**
* Start a minidfscluster.
* @param servers How many DNs to start.
@ -959,6 +983,33 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
getMiniHBaseCluster().compact(tableName, major);
}
/**
* Create a table.
* @param tableName
* @param family
* @return An HTable instance for the created table.
* @throws IOException
*/
public HTable createTable(String tableName, String family)
throws IOException{
return createTable(tableName, new String[] { family });
}
/**
* Create a table.
* @param tableName
* @param families
* @return An HTable instance for the created table.
* @throws IOException
*/
public HTable createTable(String tableName, String[] families)
throws IOException {
List<byte[]> fams = new ArrayList<byte[]>(families.length);
for (String family : families) {
fams.add(Bytes.toBytes(family));
}
return createTable(Bytes.toBytes(tableName), fams.toArray(new byte[0][]));
}
/**
* Create a table.
@ -1117,6 +1168,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return new HTable(new Configuration(getConfiguration()), tableName);
}
/**
* Drop an existing table
* @param tableName existing table
*/
public void deleteTable(String tableName) throws IOException {
deleteTable(Bytes.toBytes(tableName));
}
/**
* Drop an existing table
* @param tableName existing table

View File

@ -18,343 +18,306 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.UnsupportedEncodingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
@Category(LargeTests.class)
public class TestImportTsv implements Configurable {
@Category(MediumTests.class)
public class TestImportTsv {
private static final Log LOG = LogFactory.getLog(TestImportTsv.class);
@Test
public void testTsvParserSpecParsing() {
TsvParser parser;
parser = new TsvParser("HBASE_ROW_KEY", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertEquals(0, parser.getRowKeyColumnIndex());
assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
assertEquals(0, parser.getRowKeyColumnIndex());
assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
assertEquals(0, parser.getRowKeyColumnIndex());
assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2",
"\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
assertEquals(0, parser.getRowKeyColumnIndex());
assertTrue(parser.hasTimestamp());
assertEquals(2, parser.getTimestampKeyColumnIndex());
}
@Test
public void testTsvParser() throws BadTsvLineException {
TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
assertNull(parser.getFamily(2));
assertNull(parser.getQualifier(2));
assertEquals(2, parser.getRowKeyColumnIndex());
assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser
.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
ParsedLine parsed = parser.parse(line, line.length);
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
@Test
public void testTsvParserWithTimestamp() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertNull(parser.getFamily(1));
assertNull(parser.getQualifier(1));
assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
assertEquals(0, parser.getRowKeyColumnIndex());
assertEquals(1, parser.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
ParsedLine parsed = parser.parse(line, line.length);
assertEquals(1234l, parsed.getTimestamp(-1));
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
ArrayList<String> parsedCols = new ArrayList<String>();
for (int i = 0; i < parsed.getColumnCount(); i++) {
parsedCols.add(Bytes.toString(
parsed.getLineBytes(),
parsed.getColumnOffset(i),
parsed.getColumnLength(i)));
}
if (!Iterables.elementsEqual(parsedCols, expected)) {
fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
"Got:" + Joiner.on(",").join(parsedCols));
}
}
private void assertBytesEquals(byte[] a, byte[] b) {
assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
}
protected static final Log LOG = LogFactory.getLog(TestImportTsv.class);
protected static final String NAME = TestImportTsv.class.getSimpleName();
protected static HBaseTestingUtility util = new HBaseTestingUtility();
/**
* Test cases that throw BadTsvLineException
* Delete the tmp directory after running doMROnTableTest. Boolean. Default is
* false.
*/
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
parser.parse(line, line.length);
protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
/**
* Force use of combiner in doMROnTableTest. Boolean. Default is true.
*/
protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
private final String FAMILY = "FAM";
public Configuration getConf() {
return util.getConfiguration();
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("");
parser.parse(line, line.length);
public void setConf(Configuration conf) {
throw new IllegalArgumentException("setConf not supported");
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("key_only");
parser.parse(line, line.length);
@BeforeClass
public static void provisionCluster() throws Exception {
util.startMiniCluster();
util.startMiniMapReduceCluster();
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
parser.parse(line, line.length);
@AfterClass
public static void releaseCluster() throws Exception {
util.shutdownMiniMapReduceCluster();
util.shutdownMiniCluster();
}
@Test(expected = BadTsvLineException.class)
public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
assertEquals(1, parser.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
ParsedLine parsed = parser.parse(line, line.length);
assertEquals(-1, parsed.getTimestamp(-1));
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
@Test(expected = BadTsvLineException.class)
public void testTsvParserNoTimestampValue() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
assertEquals(2, parser.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("rowkey\tval_a");
parser.parse(line, line.length);
}
@Test
public void testMROnTable()
throws Exception {
String TABLE_NAME = "TestTable";
String FAMILY = "FAM";
String INPUT_FILE = "InputFile.esv";
public void testMROnTable() throws Exception {
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
TABLE_NAME,
INPUT_FILE
table
};
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
util.createTable(table, FAMILY);
doMROnTableTest(util, FAMILY, null, args, 1);
util.deleteTable(table);
}
@Test
public void testMROnTableWithTimestamp() throws Exception {
String TABLE_NAME = "TestTable";
String FAMILY = "FAM";
String INPUT_FILE = "InputFile1.csv";
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY
+ "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
table
};
String data = "KEY,1234,VALUE1,VALUE2\n";
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
util.createTable(table, FAMILY);
doMROnTableTest(util, FAMILY, data, args, 1);
util.deleteTable(table);
}
@Test
public void testMROnTableWithCustomMapper()
throws Exception {
String TABLE_NAME = "TestTable";
String FAMILY = "FAM";
String INPUT_FILE = "InputFile2.esv";
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
TABLE_NAME,
INPUT_FILE
table
};
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
}
private void doMROnTableTest(String inputFile, String family,
String tableName, String data, String[] args, int valueMultiplier)
throws Exception {
// Cluster
HBaseTestingUtility htu1 = new HBaseTestingUtility();
htu1.startMiniCluster();
htu1.startMiniMapReduceCluster();
Tool tool = new ImportTsv();
tool.setConf(htu1.getConfiguration());
try {
FileSystem fs = FileSystem.get(tool.getConf());
FSDataOutputStream op = fs.create(new Path(inputFile), true);
if (data == null) {
data = "KEY\u001bVALUE1\u001bVALUE2\n";
}
op.write(Bytes.toBytes(data));
op.close();
LOG.debug(String.format("Wrote test data to file: %s", fs.makeQualified(new Path(inputFile))));
if (tool.getConf().get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(family));
HBaseAdmin admin = new HBaseAdmin(tool.getConf());
admin.createTable(desc);
admin.close();
}
// force use of combiner for testing purposes
tool.getConf().setInt("min.num.spills.for.combine", 1);
assertEquals(0, ToolRunner.run(tool, args));
HTable table = new HTable(tool.getConf(), tableName);
boolean verified = false;
long pause = tool.getConf().getLong("hbase.client.pause", 5 * 1000);
int numRetries = tool.getConf().getInt("hbase.client.retries.number", 5);
for (int i = 0; i < numRetries; i++) {
try {
Scan scan = new Scan();
// Scan entire family.
scan.addFamily(Bytes.toBytes(family));
ResultScanner resScanner = table.getScanner(scan);
for (Result res : resScanner) {
assertTrue(res.size() == 2);
List<KeyValue> kvs = res.list();
assertEquals(toU8Str(kvs.get(0).getRow()),
toU8Str(Bytes.toBytes("KEY")));
assertEquals(toU8Str(kvs.get(1).getRow()),
toU8Str(Bytes.toBytes("KEY")));
assertEquals(toU8Str(kvs.get(0).getValue()),
toU8Str(Bytes.toBytes("VALUE" + valueMultiplier)));
assertEquals(toU8Str(kvs.get(1).getValue()),
toU8Str(Bytes.toBytes("VALUE" + 2*valueMultiplier)));
// Only one result set is expected, so let it loop.
}
verified = true;
break;
} catch (NullPointerException e) {
// If here, a cell was empty. Presume its because updates came in
// after the scanner had been opened. Wait a while and retry.
}
try {
Thread.sleep(pause);
} catch (InterruptedException e) {
// continue
}
}
table.close();
assertTrue(verified);
} finally {
htu1.shutdownMiniMapReduceCluster();
htu1.shutdownMiniCluster();
}
util.createTable(table, FAMILY);
doMROnTableTest(util, FAMILY, null, args, 3);
util.deleteTable(table);
}
@Test
public void testBulkOutputWithoutAnExistingTable() throws Exception {
String TABLE_NAME = "TestTable";
String FAMILY = "FAM";
String INPUT_FILE = "InputFile2.esv";
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME,
INPUT_FILE };
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
table
};
doMROnTableTest(util, FAMILY, null, args, 3);
util.deleteTable(table);
}
public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
return new String(bytes);
@Test
public void testBulkOutputWithAnExistingTable() throws Exception {
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
table
};
util.createTable(table, FAMILY);
doMROnTableTest(util, FAMILY, null, args, 3);
util.deleteTable(table);
}
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
String data, String[] args) throws Exception {
return doMROnTableTest(util, family, data, args, 1);
}
/**
* Run an ImportTsv job and perform basic validation on the results.
* Returns the ImportTsv <code>Tool</code> instance so that other tests can
* inspect it for further validation as necessary. This method is static to
* insure non-reliance on instance's util/conf facilities.
* @param args Any arguments to pass BEFORE inputFile path is appended.
* @return The Tool instance used to run the test.
*/
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
String data, String[] args, int valueMultiplier)
throws Exception {
String table = args[args.length - 1];
Configuration conf = new Configuration(util.getConfiguration());
// populate input file
FileSystem fs = FileSystem.get(conf);
Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat"));
FSDataOutputStream op = fs.create(inputPath, true);
if (data == null) {
data = "KEY\u001bVALUE1\u001bVALUE2\n";
}
op.write(Bytes.toBytes(data));
op.close();
LOG.debug(String.format("Wrote test data to file: %s", inputPath));
if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
LOG.debug("Forcing combiner.");
conf.setInt("min.num.spills.for.combine", 1);
}
// run the import
List<String> argv = new ArrayList<String>(Arrays.asList(args));
argv.add(inputPath.toString());
Tool tool = new ImportTsv();
LOG.debug("Running ImportTsv with arguments: " + argv);
assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
// Perform basic validation. If the input args did not include
// ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
// Otherwise, validate presence of hfiles.
boolean createdHFiles = false;
String outputPath = null;
for (String arg : argv) {
if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
createdHFiles = true;
// split '-Dfoo=bar' on '=' and keep 'bar'
outputPath = arg.split("=")[1];
break;
}
}
if (createdHFiles)
validateHFiles(fs, outputPath, family);
else
validateTable(conf, table, family, valueMultiplier);
if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
LOG.debug("Deleting test subdirectory");
util.cleanupDataTestDirOnTestFS(table);
}
return tool;
}
/**
* Confirm ImportTsv via data in online table.
*/
private static void validateTable(Configuration conf, String tableName,
String family, int valueMultiplier) throws IOException {
LOG.debug("Validating table.");
HTable table = new HTable(conf, tableName);
boolean verified = false;
long pause = conf.getLong("hbase.client.pause", 5 * 1000);
int numRetries = conf.getInt("hbase.client.retries.number", 5);
for (int i = 0; i < numRetries; i++) {
try {
Scan scan = new Scan();
// Scan entire family.
scan.addFamily(Bytes.toBytes(family));
ResultScanner resScanner = table.getScanner(scan);
for (Result res : resScanner) {
assertTrue(res.size() == 2);
List<KeyValue> kvs = res.list();
assertArrayEquals(kvs.get(0).getRow(), Bytes.toBytes("KEY"));
assertArrayEquals(kvs.get(1).getRow(), Bytes.toBytes("KEY"));
assertArrayEquals(kvs.get(0).getValue(),
Bytes.toBytes("VALUE" + valueMultiplier));
assertArrayEquals(kvs.get(1).getValue(),
Bytes.toBytes("VALUE" + 2 * valueMultiplier));
// Only one result set is expected, so let it loop.
}
verified = true;
break;
} catch (NullPointerException e) {
// If here, a cell was empty. Presume its because updates came in
// after the scanner had been opened. Wait a while and retry.
}
try {
Thread.sleep(pause);
} catch (InterruptedException e) {
// continue
}
}
table.close();
assertTrue(verified);
}
/**
* Confirm ImportTsv via HFiles on fs.
*/
private static void validateHFiles(FileSystem fs, String outputPath, String family)
throws IOException {
// validate number and content of output columns
LOG.debug("Validating HFiles.");
Set<String> configFamilies = new HashSet<String>();
configFamilies.add(family);
Set<String> foundFamilies = new HashSet<String>();
for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
String cf = elements[elements.length - 1];
foundFamilies.add(cf);
assertTrue(
String.format(
"HFile ouput contains a column family (%s) not present in input families (%s)",
cf, configFamilies),
configFamilies.contains(cf));
for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
assertTrue(
String.format("HFile %s appears to contain no data.", hfile.getPath()),
hfile.getLen() > 0);
}
}
}
}

View File

@ -0,0 +1,171 @@
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
/**
* Tests for {@link TsvParser}.
*/
@Category(SmallTests.class)
public class TestImportTsvParser {
private void assertBytesEquals(byte[] a, byte[] b) {
assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
}
private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
ArrayList<String> parsedCols = new ArrayList<String>();
for (int i = 0; i < parsed.getColumnCount(); i++) {
parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i),
parsed.getColumnLength(i)));
}
if (!Iterables.elementsEqual(parsedCols, expected)) {
fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:"
+ Joiner.on(",").join(parsedCols));
}
}
@Test
public void testTsvParserSpecParsing() {
TsvParser parser;
parser = new TsvParser("HBASE_ROW_KEY", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertEquals(0, parser.getRowKeyColumnIndex());
assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
assertEquals(0, parser.getRowKeyColumnIndex());
assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
assertEquals(0, parser.getRowKeyColumnIndex());
assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
assertEquals(0, parser.getRowKeyColumnIndex());
assertTrue(parser.hasTimestamp());
assertEquals(2, parser.getTimestampKeyColumnIndex());
}
@Test
public void testTsvParser() throws BadTsvLineException {
TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
assertNull(parser.getFamily(2));
assertNull(parser.getQualifier(2));
assertEquals(2, parser.getRowKeyColumnIndex());
assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX,
parser.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
ParsedLine parsed = parser.parse(line, line.length);
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
@Test
public void testTsvParserWithTimestamp() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertNull(parser.getFamily(1));
assertNull(parser.getQualifier(1));
assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
assertEquals(0, parser.getRowKeyColumnIndex());
assertEquals(1, parser.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
ParsedLine parsed = parser.parse(line, line.length);
assertEquals(1234l, parsed.getTimestamp(-1));
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
/**
* Test cases that throw BadTsvLineException
*/
@Test(expected = BadTsvLineException.class)
public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
parser.parse(line, line.length);
}
@Test(expected = BadTsvLineException.class)
public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("");
parser.parse(line, line.length);
}
@Test(expected = BadTsvLineException.class)
public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("key_only");
parser.parse(line, line.length);
}
@Test(expected = BadTsvLineException.class)
public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
parser.parse(line, line.length);
}
@Test(expected = BadTsvLineException.class)
public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
assertEquals(1, parser.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
ParsedLine parsed = parser.parse(line, line.length);
assertEquals(-1, parsed.getTimestamp(-1));
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
@Test(expected = BadTsvLineException.class)
public void testTsvParserNoTimestampValue() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
assertEquals(2, parser.getTimestampKeyColumnIndex());
byte[] line = Bytes.toBytes("rowkey\tval_a");
parser.parse(line, line.length);
}
}