HBASE-14380 Correct data gets skipped along with bad data in importTsv bulk load thru TsvImporterTextMapper (Bhupendra Kumar Jain)

This commit is contained in:
tedyu 2015-09-11 09:05:17 -07:00
parent c438052cc2
commit a8730c2839
2 changed files with 63 additions and 8 deletions

View File

@ -189,14 +189,14 @@ public class TextSortReducer extends
if (skipBadLines) {
System.err.println("Bad line." + badLine.getMessage());
incrementBadLineCount(1);
return;
continue;
}
throw new IOException(badLine);
} catch (IllegalArgumentException e) {
if (skipBadLines) {
System.err.println("Bad line." + e.getMessage());
incrementBadLineCount(1);
return;
continue;
}
throw new IOException(e);
}

View File

@ -51,6 +51,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -346,13 +349,34 @@ public class TestImportTsv implements Configurable {
util.deleteTable(table);
}
/**
* If there are invalid data rows as inputs, then only those rows should be ignored.
*/
@Test
public void testTsvImporterTextMapperWithInvalidData() throws Exception {
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
// 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS
String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
doMROnTableTest(data, 1, 4);
util.deleteTable(table);
}
private Tool doMROnTableTest(String data, int valueMultiplier,int expectedKVCount)
throws Exception {
return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier,expectedKVCount);
}
private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier);
return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier,-1);
}
protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
String family, String data, Map<String, String> args) throws Exception {
return doMROnTableTest(util, table, family, data, args, 1);
return doMROnTableTest(util, table, family, data, args, 1,-1);
}
/**
@ -364,7 +388,7 @@ public class TestImportTsv implements Configurable {
* @return The Tool instance used to run the test.
*/
protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
String family, String data, Map<String, String> args, int valueMultiplier)
String family, String data, Map<String, String> args, int valueMultiplier,int expectedKVCount)
throws Exception {
Configuration conf = new Configuration(util.getConfiguration());
@ -412,7 +436,7 @@ public class TestImportTsv implements Configurable {
ImportTsv.BULK_OUTPUT_CONF_KEY),
fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
} else {
validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family);
validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount);
}
} else {
validateTable(conf, TableName.valueOf(table), family, valueMultiplier, isDryRun);
@ -479,13 +503,14 @@ public class TestImportTsv implements Configurable {
/**
* Confirm ImportTsv via HFiles on fs.
*/
private static void validateHFiles(FileSystem fs, String outputPath, String family)
throws IOException {
private static void validateHFiles(FileSystem fs, String outputPath, String family,
int expectedKVCount) throws IOException {
// validate number and content of output columns
LOG.debug("Validating HFiles.");
Set<String> configFamilies = new HashSet<String>();
configFamilies.add(family);
Set<String> foundFamilies = new HashSet<String>();
int actualKVCount = 0;
for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
String cf = elements[elements.length - 1];
@ -499,10 +524,40 @@ public class TestImportTsv implements Configurable {
assertTrue(
String.format("HFile %s appears to contain no data.", hfile.getPath()),
hfile.getLen() > 0);
// count the number of KVs from all the hfiles
if (expectedKVCount > -1) {
actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
}
}
}
assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
foundFamilies.contains(family));
if (expectedKVCount > -1) {
assertTrue(String.format(
"KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount,
expectedKVCount), actualKVCount == expectedKVCount);
}
}
/**
* Method returns the total KVs in given hfile
* @param fs File System
* @param p HFile path
* @return KV count in the given hfile
* @throws IOException
*/
private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
Configuration conf = util.getConfiguration();
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, false);
scanner.seekTo();
int count = 0;
do {
count++;
} while (scanner.next());
reader.close();
return count;
}
}