HBASE-13702 ImportTsv: Add dry-run functionality and log bad rows (Apekshit Sharma)

This commit is contained in:
tedyu 2015-07-04 07:25:00 -07:00
parent 67b61005fe
commit 9e54e195f6
5 changed files with 312 additions and 174 deletions

View File

@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@ -38,11 +40,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@ -187,19 +189,18 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
Path hfiles = new Path(
util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles");
String[] args = {
format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles),
format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2",
ImportTsv.COLUMNS_CONF_KEY, cf, cf),
// configure the test harness to NOT delete the HFiles after they're
// generated. We need those for doLoadIncrementalHFiles
format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF),
table.getNameAsString()
};
Map<String, String> args = new HashMap<String, String>();
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
args.put(ImportTsv.COLUMNS_CONF_KEY,
format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf));
// configure the test harness to NOT delete the HFiles after they're
// generated. We need those for doLoadIncrementalHFiles
args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false");
// run the job, complete the load.
util.createTable(table, new String[]{cf});
Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
Tool t = TestImportTsv.doMROnTableTest(util, table.getNameAsString(), cf, simple_tsv, args);
doLoadIncrementalHFiles(hfiles, table);
// validate post-conditions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -53,6 +54,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
@ -86,6 +88,9 @@ public class ImportTsv extends Configured implements Tool {
public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
// TODO: the rest of these configs are used exclusively by TsvImporterMapper.
// Move them out of the tool and let the mapper handle its own validation.
public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run";
// If true, bad lines are logged to stderr. Default: false.
public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
public final static String COLUMNS_CONF_KEY = "importtsv.columns";
public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
@ -99,6 +104,11 @@ public class ImportTsv extends Configured implements Tool {
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
public final static String CREATE_TABLE_CONF_KEY = "create.table";
public final static String NO_STRICT_COL_FAMILY = "no.strict";
/**
* If table didn't exist and was created in dry-run mode, this flag is
* flipped to delete it when MR ends.
*/
private static boolean dryRunTableCreated;
public static class TsvParser {
/**
@ -450,9 +460,10 @@ public class ImportTsv extends Configured implements Tool {
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
protected static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException, ClassNotFoundException {
Job job = null;
boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Admin admin = connection.getAdmin()) {
// Support non-XML supported characters
@ -476,6 +487,7 @@ public class ImportTsv extends Configured implements Tool {
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(mapperClass);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
@ -486,13 +498,19 @@ public class ImportTsv extends Configured implements Tool {
if (hfileOutPath != null) {
if (!admin.tableExists(tableName)) {
String errorMsg = format("Table '%s' does not exist.", tableName);
LOG.warn(format("Table '%s' does not exist.", tableName));
if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
LOG.warn(errorMsg);
// TODO: this is backwards. Instead of depending on the existence of a table,
// create a sane splits file for HFileOutputFormat based on data sampling.
createTable(admin, tableName, columns);
if (isDryRun) {
LOG.warn("Dry run: Table will be deleted at end of dry run.");
dryRunTableCreated = true;
}
} else {
String errorMsg =
format("Table '%s' does not exist and '%s' is set to no.", tableName,
CREATE_TABLE_CONF_KEY);
LOG.error(errorMsg);
throw new TableNotFoundException(errorMsg);
}
@ -523,21 +541,22 @@ public class ImportTsv extends Configured implements Tool {
+ "=true.\n";
usage(msg);
System.exit(-1);
}
}
}
job.setReducerClass(PutSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
if (mapperClass.equals(TsvImporterTextMapper.class)) {
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TextSortReducer.class);
} else {
job.setMapOutputValueClass(Put.class);
job.setCombinerClass(PutCombiner.class);
job.setReducerClass(PutSortReducer.class);
}
if (!isDryRun) {
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
regionLocator);
}
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
regionLocator);
}
} else {
if (!admin.tableExists(tableName)) {
@ -552,13 +571,20 @@ public class ImportTsv extends Configured implements Tool {
+ " or custom mapper whose value type is Put.");
System.exit(-1);
}
// No reducers. Just write straight to table. Call initTableReducerJob
// to set up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null,
job);
if (!isDryRun) {
// No reducers. Just write straight to table. Call initTableReducerJob
// to set up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
}
job.setNumReduceTasks(0);
}
if (isDryRun) {
job.setOutputFormatClass(NullOutputFormat.class);
job.getConfiguration().setStrings("io.serializations",
job.getConfiguration().get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
}
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Function.class /* Guava used by TsvParser */);
@ -579,7 +605,24 @@ public class ImportTsv extends Configured implements Tool {
tableName, cfSet));
admin.createTable(htd);
}
private static void deleteTable(Configuration conf, String[] args) {
TableName tableName = TableName.valueOf(args[0]);
try (Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin()) {
try {
admin.disableTable(tableName);
} catch (TableNotEnabledException e) {
LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it.");
}
admin.deleteTable(tableName);
} catch (IOException e) {
LOG.error(format("***Dry run: Failed to delete table '%s'.***\n%s", tableName, e.toString()));
return;
}
LOG.info(format("Dry run: Deleted table '%s'.", tableName));
}
private static Set<String> getColumnFamilies(String[] columns) {
Set<String> cfSet = new HashSet<String>();
for (String aColumn : columns) {
@ -630,7 +673,10 @@ public class ImportTsv extends Configured implements Tool {
" Note: if you do not use this option, then the target table must already exist in HBase\n" +
"\n" +
"Other options that may be specified with -D include:\n" +
" -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" +
" table. If table does not exist, it is created but deleted in the end.\n" +
" -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
" -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
" '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
" -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
" -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
@ -717,8 +763,13 @@ public class ImportTsv extends Configured implements Tool {
// system time
getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
dryRunTableCreated = false;
Job job = createSubmittableJob(getConf(), otherArgs);
return job.waitForCompletion(true) ? 0 : 1;
boolean success = job.waitForCompletion(true);
if (dryRunTableCreated) {
deleteTable(getConf(), args);
}
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {

View File

@ -57,6 +57,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
/** Should skip bad lines */
private boolean skipBadLines;
private Counter badLineCount;
private boolean logBadLines;
protected ImportTsv.TsvParser parser;
@ -129,6 +130,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
skipBadLines = context.getConfiguration().getBoolean(
ImportTsv.SKIP_LINES_CONF_KEY, true);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
}
@ -163,26 +165,16 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
populatePut(lineBytes, parsed, put, i);
}
context.write(rowKey, put);
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
} catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
if (logBadLines) {
System.err.println(value);
}
System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
if (skipBadLines) {
System.err.println(
"Bad line at offset: " + offset.get() + ":\n" +
badLine.getMessage());
incrementBadLineCount(1);
return;
} else {
throw new IOException(badLine);
}
} catch (IllegalArgumentException e) {
if (skipBadLines) {
System.err.println(
"Bad line at offset: " + offset.get() + ":\n" +
e.getMessage());
incrementBadLineCount(1);
return;
} else {
throw new IOException(e);
}
throw new IOException(badLine);
} catch (InterruptedException e) {
e.printStackTrace();
}

View File

@ -45,6 +45,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
/** Should skip bad lines */
private boolean skipBadLines;
private Counter badLineCount;
private boolean logBadLines;
private ImportTsv.TsvParser parser;
@ -97,6 +98,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
}
skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
}
@ -110,21 +112,16 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
context.write(rowKey, value);
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
} catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
if (logBadLines) {
System.err.println(value);
}
System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
if (skipBadLines) {
System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
incrementBadLineCount(1);
return;
}
throw new IOException(badLine);
} catch (IllegalArgumentException e) {
if (skipBadLines) {
System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage());
incrementBadLineCount(1);
return;
} else {
throw new IOException(e);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();

View File

@ -19,13 +19,15 @@
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@ -39,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -53,13 +56,17 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
@Category(LargeTests.class)
public class TestImportTsv implements Configurable {
@ -68,10 +75,7 @@ public class TestImportTsv implements Configurable {
protected static final String NAME = TestImportTsv.class.getSimpleName();
protected static HBaseTestingUtility util = new HBaseTestingUtility();
/**
* Delete the tmp directory after running doMROnTableTest. Boolean. Default is
* false.
*/
// Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
/**
@ -80,6 +84,11 @@ public class TestImportTsv implements Configurable {
protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
private final String FAMILY = "FAM";
private String table;
private Map<String, String> args;
@Rule
public ExpectedException exception = ExpectedException.none();
public Configuration getConf() {
return util.getConfiguration();
@ -101,112 +110,80 @@ public class TestImportTsv implements Configurable {
util.shutdownMiniCluster();
}
@Before
public void setup() throws Exception {
table = "test-" + UUID.randomUUID();
args = new HashMap<String, String>();
// Prepare the arguments required for the test.
args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
}
@Test
public void testMROnTable() throws Exception {
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
table
};
util.createTable(TableName.valueOf(table), FAMILY);
doMROnTableTest(util, FAMILY, null, args, 1);
doMROnTableTest(null, 1);
util.deleteTable(table);
}
@Test
public void testMROnTableWithTimestamp() throws Exception {
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY
+ "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
table
};
util.createTable(TableName.valueOf(table), FAMILY);
args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
String data = "KEY,1234,VALUE1,VALUE2\n";
util.createTable(TableName.valueOf(table), FAMILY);
doMROnTableTest(util, FAMILY, data, args, 1);
doMROnTableTest(data, 1);
util.deleteTable(table);
}
@Test
public void testMROnTableWithCustomMapper()
throws Exception {
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
table
};
util.createTable(TableName.valueOf(table), FAMILY);
doMROnTableTest(util, FAMILY, null, args, 3);
args.put(ImportTsv.MAPPER_CONF_KEY,
"org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
doMROnTableTest(null, 3);
util.deleteTable(table);
}
@Test
public void testBulkOutputWithoutAnExistingTable() throws Exception {
String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
table
};
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
doMROnTableTest(util, FAMILY, null, args, 3);
doMROnTableTest(null, 3);
util.deleteTable(table);
}
@Test
public void testBulkOutputWithAnExistingTable() throws Exception {
String table = "test-" + UUID.randomUUID();
util.createTable(TableName.valueOf(table), FAMILY);
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
table
};
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
util.createTable(TableName.valueOf(table), FAMILY);
doMROnTableTest(util, FAMILY, null, args, 3);
doMROnTableTest(null, 3);
util.deleteTable(table);
}
@Test
public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
String table = "test-" + UUID.randomUUID();
util.createTable(TableName.valueOf(table), FAMILY);
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
"-D" + ImportTsv.NO_STRICT_COL_FAMILY + "=true",
table
};
util.createTable(TableName.valueOf(table), FAMILY);
doMROnTableTest(util, FAMILY, null, args, 3);
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
doMROnTableTest(null, 3);
util.deleteTable(table);
}
@Test
public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
String table = "test-" + UUID.randomUUID();
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
String INPUT_FILE = "InputFile1.csv";
// Prepare the arguments required for the test.
@ -220,59 +197,164 @@ public class TestImportTsv implements Configurable {
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table,
INPUT_FILE
};
GenericOptionsParser opts = new GenericOptionsParser(util.getConfiguration(), args);
Configuration conf = new Configuration(util.getConfiguration());
GenericOptionsParser opts = new GenericOptionsParser(conf, args);
args = opts.getRemainingArgs();
Job job = ImportTsv.createSubmittableJob(util.getConfiguration(), args);
assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
assertTrue(job.getReducerClass().equals(TextSortReducer.class));
assertTrue(job.getMapOutputValueClass().equals(Text.class));
assertEquals("running test job configuration failed.", 0,
ToolRunner.run(conf, new ImportTsv() {
@Override
public int run(String[] args) throws Exception {
Job job = createSubmittableJob(getConf(), args);
assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
assertTrue(job.getReducerClass().equals(TextSortReducer.class));
assertTrue(job.getMapOutputValueClass().equals(Text.class));
return 0;
}
}, args));
// Delete table created by createSubmittableJob.
util.deleteTable(table);
}
@Test
public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
String table = "test-" + UUID.randomUUID();
String FAMILY = "FAM";
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
// Prepare the arguments required for the test.
String[] args =
new String[] {
"-D" + ImportTsv.MAPPER_CONF_KEY
+ "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
"-D" + ImportTsv.COLUMNS_CONF_KEY
+ "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table
};
args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
String data = "KEY\u001bVALUE4\u001bVALUE8\n";
doMROnTableTest(util, FAMILY, data, args, 4);
doMROnTableTest(data, 4);
util.deleteTable(table);
}
@Test(expected = TableNotFoundException.class)
@Test
public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
String table = "test-" + UUID.randomUUID();
String[] args =
new String[] { table, "/inputFile" };
String[] args = new String[] { table, "/inputFile" };
Configuration conf = new Configuration(util.getConfiguration());
conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
ImportTsv.createSubmittableJob(conf, args);
exception.expect(TableNotFoundException.class);
assertEquals("running test job configuration failed.", 0,
ToolRunner.run(conf, new ImportTsv() {
@Override public int run(String[] args) throws Exception {
createSubmittableJob(getConf(), args);
return 0;
}
}, args));
}
@Test(expected = TableNotFoundException.class)
@Test
public void testMRWithoutAnExistingTable() throws Exception {
String table = "test-" + UUID.randomUUID();
String[] args =
new String[] { table, "/inputFile" };
Configuration conf = new Configuration(util.getConfiguration());
ImportTsv.createSubmittableJob(conf, args);
exception.expect(TableNotFoundException.class);
assertEquals("running test job configuration failed.", 0, ToolRunner.run(
new Configuration(util.getConfiguration()),
new ImportTsv() {
@Override
public int run(String[] args) throws Exception {
createSubmittableJob(getConf(), args);
return 0;
}
}, args));
}
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
String data, String[] args) throws Exception {
return doMROnTableTest(util, family, data, args, 1);
@Test
public void testJobConfigurationsWithDryMode() throws Exception {
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
String INPUT_FILE = "InputFile1.csv";
// Prepare the arguments required for the test.
String[] argsArray = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
"-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true",
table,
INPUT_FILE };
assertEquals("running test job configuration failed.", 0, ToolRunner.run(
new Configuration(util.getConfiguration()),
new ImportTsv() {
@Override
public int run(String[] args) throws Exception {
Job job = createSubmittableJob(getConf(), args);
assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
return 0;
}
}, argsArray));
// Delete table created by createSubmittableJob.
util.deleteTable(table);
}
@Test
public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
util.createTable(TableName.valueOf(table), FAMILY);
args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
doMROnTableTest(null, 1);
// Dry mode should not delete an existing table. If it's not present,
// this will throw TableNotFoundException.
util.deleteTable(table);
}
/**
* If table is not present in non-bulk mode, dry run should fail just like
* normal mode.
*/
@Test
public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
exception.expect(TableNotFoundException.class);
doMROnTableTest(null, 1);
}
@Test public void testDryModeWithBulkOutputAndTableExists() throws Exception {
util.createTable(TableName.valueOf(table), FAMILY);
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
doMROnTableTest(null, 1);
// Dry mode should not delete an existing table. If it's not present,
// this will throw TableNotFoundException.
util.deleteTable(table);
}
/**
* If table is not present in bulk mode and create.table is not set to yes,
* import should fail with TableNotFoundException.
*/
@Test
public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws
Exception {
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
exception.expect(TableNotFoundException.class);
doMROnTableTest(null, 1);
}
@Test
public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception {
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
doMROnTableTest(null, 1);
// Verify temporary table was deleted.
exception.expect(TableNotFoundException.class);
util.deleteTable(table);
}
private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier);
}
protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
String family, String data, Map<String, String> args) throws Exception {
return doMROnTableTest(util, table, family, data, args, 1);
}
/**
@ -283,10 +365,9 @@ public class TestImportTsv implements Configurable {
* @param args Any arguments to pass BEFORE inputFile path is appended.
* @return The Tool instance used to run the test.
*/
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
String data, String[] args, int valueMultiplier)
protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
String family, String data, Map<String, String> args, int valueMultiplier)
throws Exception {
String table = args[args.length - 1];
Configuration conf = new Configuration(util.getConfiguration());
// populate input file
@ -305,32 +386,40 @@ public class TestImportTsv implements Configurable {
conf.setInt("mapreduce.map.combine.minspills", 1);
}
// Build args array.
String[] argsArray = new String[args.size() + 2];
Iterator it = args.entrySet().iterator();
int i = 0;
while (it.hasNext()) {
Map.Entry pair = (Map.Entry) it.next();
argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
i++;
}
argsArray[i] = table;
argsArray[i + 1] = inputPath.toString();
// run the import
List<String> argv = new ArrayList<String>(Arrays.asList(args));
argv.add(inputPath.toString());
Tool tool = new ImportTsv();
LOG.debug("Running ImportTsv with arguments: " + argv);
assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
LOG.debug("Running ImportTsv with arguments: " + argsArray);
assertEquals(0, ToolRunner.run(conf, tool, argsArray));
// Perform basic validation. If the input args did not include
// ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
// Otherwise, validate presence of hfiles.
boolean createdHFiles = false;
String outputPath = null;
for (String arg : argv) {
if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
createdHFiles = true;
// split '-Dfoo=bar' on '=' and keep 'bar'
outputPath = arg.split("=")[1];
break;
boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) &&
"true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
if (isDryRun) {
assertFalse(String.format("Dry run mode, %s should not have been created.",
ImportTsv.BULK_OUTPUT_CONF_KEY),
fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
} else {
validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family);
}
} else {
validateTable(conf, TableName.valueOf(table), family, valueMultiplier, isDryRun);
}
if (createdHFiles)
validateHFiles(fs, outputPath, family);
else
validateTable(conf, TableName.valueOf(table), family, valueMultiplier);
if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
LOG.debug("Deleting test subdirectory");
util.cleanupDataTestDirOnTestFS(table);
@ -342,7 +431,7 @@ public class TestImportTsv implements Configurable {
* Confirm ImportTsv via data in online table.
*/
private static void validateTable(Configuration conf, TableName tableName,
String family, int valueMultiplier) throws IOException {
String family, int valueMultiplier, boolean isDryRun) throws IOException {
LOG.debug("Validating table.");
Table table = new HTable(conf, tableName);
@ -355,8 +444,10 @@ public class TestImportTsv implements Configurable {
// Scan entire family.
scan.addFamily(Bytes.toBytes(family));
ResultScanner resScanner = table.getScanner(scan);
int numRows = 0;
for (Result res : resScanner) {
assertTrue(res.size() == 2);
numRows++;
assertEquals(2, res.size());
List<Cell> kvs = res.listCells();
assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
@ -364,6 +455,11 @@ public class TestImportTsv implements Configurable {
assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
// Only one result set is expected, so let it loop.
}
if (isDryRun) {
assertEquals(0, numRows);
} else {
assertEquals(1, numRows);
}
verified = true;
break;
} catch (NullPointerException e) {
@ -385,7 +481,6 @@ public class TestImportTsv implements Configurable {
*/
private static void validateHFiles(FileSystem fs, String outputPath, String family)
throws IOException {
// validate number and content of output columns
LOG.debug("Validating HFiles.");
Set<String> configFamilies = new HashSet<String>();
@ -397,7 +492,7 @@ public class TestImportTsv implements Configurable {
foundFamilies.add(cf);
assertTrue(
String.format(
"HFile ouput contains a column family (%s) not present in input families (%s)",
"HFile output contains a column family (%s) not present in input families (%s)",
cf, configFamilies),
configFamilies.contains(cf));
for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
@ -406,6 +501,8 @@ public class TestImportTsv implements Configurable {
hfile.getLen() > 0);
}
}
assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
foundFamilies.contains(family));
}
}