diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index 0aaeeb0d6b3..5056f0bf2a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -189,14 +189,14 @@ public class TextSortReducer extends if (skipBadLines) { System.err.println("Bad line." + badLine.getMessage()); incrementBadLineCount(1); - return; + continue; } throw new IOException(badLine); } catch (IllegalArgumentException e) { if (skipBadLines) { System.err.println("Bad line." + e.getMessage()); incrementBadLineCount(1); - return; + continue; } throw new IOException(e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index ca19af4aabb..099ebe1cedb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -51,6 +51,9 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; import org.apache.hadoop.hbase.util.Bytes; @@ -346,13 +349,34 @@ public class TestImportTsv implements Configurable { util.deleteTable(table); } + /** + * If there are invalid data rows as inputs, then only those rows should be ignored. + */ + @Test + public void testTsvImporterTextMapperWithInvalidData() throws Exception { + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); + args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); + // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS + String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; + doMROnTableTest(data, 1, 4); + util.deleteTable(table); + } + + private Tool doMROnTableTest(String data, int valueMultiplier,int expectedKVCount) + throws Exception { + return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier,expectedKVCount); + } + private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { - return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier); + return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier,-1); } protected static Tool doMROnTableTest(HBaseTestingUtility util, String table, String family, String data, Map args) throws Exception { - return doMROnTableTest(util, table, family, data, args, 1); + return doMROnTableTest(util, table, family, data, args, 1,-1); } /** @@ -364,7 +388,7 @@ public class TestImportTsv implements Configurable { * @return The Tool instance used to run the test. */ protected static Tool doMROnTableTest(HBaseTestingUtility util, String table, - String family, String data, Map args, int valueMultiplier) + String family, String data, Map args, int valueMultiplier,int expectedKVCount) throws Exception { Configuration conf = new Configuration(util.getConfiguration()); @@ -412,7 +436,7 @@ public class TestImportTsv implements Configurable { ImportTsv.BULK_OUTPUT_CONF_KEY), fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY))); } else { - validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family); + validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount); } } else { validateTable(conf, TableName.valueOf(table), family, valueMultiplier, isDryRun); @@ -479,13 +503,14 @@ public class TestImportTsv implements Configurable { /** * Confirm ImportTsv via HFiles on fs. */ - private static void validateHFiles(FileSystem fs, String outputPath, String family) - throws IOException { + private static void validateHFiles(FileSystem fs, String outputPath, String family, + int expectedKVCount) throws IOException { // validate number and content of output columns LOG.debug("Validating HFiles."); Set configFamilies = new HashSet(); configFamilies.add(family); Set foundFamilies = new HashSet(); + int actualKVCount = 0; for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); String cf = elements[elements.length - 1]; @@ -499,10 +524,40 @@ public class TestImportTsv implements Configurable { assertTrue( String.format("HFile %s appears to contain no data.", hfile.getPath()), hfile.getLen() > 0); + // count the number of KVs from all the hfiles + if (expectedKVCount > -1) { + actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); + } } } assertTrue(String.format("HFile output does not contain the input family '%s'.", family), foundFamilies.contains(family)); + if (expectedKVCount > -1) { + assertTrue(String.format( + "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, + expectedKVCount), actualKVCount == expectedKVCount); + } + } + + /** + * Method returns the total KVs in given hfile + * @param fs File System + * @param p HFile path + * @return KV count in the given hfile + * @throws IOException + */ + private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { + Configuration conf = util.getConfiguration(); + HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf); + reader.loadFileInfo(); + HFileScanner scanner = reader.getScanner(false, false); + scanner.seekTo(); + int count = 0; + do { + count++; + } while (scanner.next()); + reader.close(); + return count; } }