HBASE-8011 Refactor ImportTsv
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1455284 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c11bd4a464
commit
df4748b369
|
@ -18,14 +18,19 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static java.lang.String.format;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
@ -42,6 +47,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
|
@ -57,20 +64,25 @@ import com.google.common.collect.Lists;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class ImportTsv {
|
public class ImportTsv extends Configured implements Tool {
|
||||||
|
|
||||||
|
protected static final Log LOG = LogFactory.getLog(ImportTsv.class);
|
||||||
|
|
||||||
final static String NAME = "importtsv";
|
final static String NAME = "importtsv";
|
||||||
|
|
||||||
final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
|
public final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
|
||||||
final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
|
public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
|
||||||
final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
|
public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
|
||||||
final static String COLUMNS_CONF_KEY = "importtsv.columns";
|
// TODO: the rest of these configs are used exclusively by TsvImporterMapper.
|
||||||
final static String SEPARATOR_CONF_KEY = "importtsv.separator";
|
// Move them out of the tool and let the mapper handle its own validation.
|
||||||
final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
|
public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
|
||||||
|
public final static String COLUMNS_CONF_KEY = "importtsv.columns";
|
||||||
|
public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
|
||||||
|
|
||||||
final static String DEFAULT_SEPARATOR = "\t";
|
final static String DEFAULT_SEPARATOR = "\t";
|
||||||
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
|
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
|
||||||
private static HBaseAdmin hbaseAdmin;
|
|
||||||
|
|
||||||
static class TsvParser {
|
public static class TsvParser {
|
||||||
/**
|
/**
|
||||||
* Column families and qualifiers mapped to the TSV columns
|
* Column families and qualifiers mapped to the TSV columns
|
||||||
*/
|
*/
|
||||||
|
@ -245,7 +257,9 @@ public class ImportTsv {
|
||||||
* @throws IOException When setting up the job fails.
|
* @throws IOException When setting up the job fails.
|
||||||
*/
|
*/
|
||||||
public static Job createSubmittableJob(Configuration conf, String[] args)
|
public static Job createSubmittableJob(Configuration conf, String[] args)
|
||||||
throws IOException, ClassNotFoundException {
|
throws IOException, ClassNotFoundException {
|
||||||
|
|
||||||
|
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||||
|
|
||||||
// Support non-XML supported characters
|
// Support non-XML supported characters
|
||||||
// by re-encoding the passed separator as a Base64 string.
|
// by re-encoding the passed separator as a Base64 string.
|
||||||
|
@ -272,9 +286,13 @@ public class ImportTsv {
|
||||||
job.setMapperClass(mapperClass);
|
job.setMapperClass(mapperClass);
|
||||||
|
|
||||||
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
||||||
|
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
|
||||||
if (hfileOutPath != null) {
|
if (hfileOutPath != null) {
|
||||||
if (!doesTableExist(tableName)) {
|
if (!admin.tableExists(tableName)) {
|
||||||
createTable(conf, tableName);
|
LOG.warn(format("Table '%s' does not exist.", tableName));
|
||||||
|
// TODO: this is backwards. Instead of depending on the existence of a table,
|
||||||
|
// create a sane splits file for HFileOutputFormat based on data sampling.
|
||||||
|
createTable(admin, tableName, columns);
|
||||||
}
|
}
|
||||||
HTable table = new HTable(conf, tableName);
|
HTable table = new HTable(conf, tableName);
|
||||||
job.setReducerClass(PutSortReducer.class);
|
job.setReducerClass(PutSortReducer.class);
|
||||||
|
@ -285,7 +303,7 @@ public class ImportTsv {
|
||||||
job.setCombinerClass(PutCombiner.class);
|
job.setCombinerClass(PutCombiner.class);
|
||||||
HFileOutputFormat.configureIncrementalLoad(job, table);
|
HFileOutputFormat.configureIncrementalLoad(job, table);
|
||||||
} else {
|
} else {
|
||||||
// No reducers. Just write straight to table. Call initTableReducerJob
|
// No reducers. Just write straight to table. Call initTableReducerJob
|
||||||
// to set up the TableOutputFormat.
|
// to set up the TableOutputFormat.
|
||||||
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
|
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
|
||||||
job.setNumReduceTasks(0);
|
job.setNumReduceTasks(0);
|
||||||
|
@ -297,14 +315,9 @@ public class ImportTsv {
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean doesTableExist(String tableName) throws IOException {
|
private static void createTable(HBaseAdmin admin, String tableName, String[] columns)
|
||||||
return hbaseAdmin.tableExists(tableName.getBytes());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void createTable(Configuration conf, String tableName)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());
|
HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());
|
||||||
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
|
|
||||||
Set<String> cfSet = new HashSet<String>();
|
Set<String> cfSet = new HashSet<String>();
|
||||||
for (String aColumn : columns) {
|
for (String aColumn : columns) {
|
||||||
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)) continue;
|
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)) continue;
|
||||||
|
@ -315,7 +328,9 @@ public class ImportTsv {
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
|
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
|
||||||
htd.addFamily(hcd);
|
htd.addFamily(hcd);
|
||||||
}
|
}
|
||||||
hbaseAdmin.createTable(htd);
|
LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
|
||||||
|
tableName, cfSet));
|
||||||
|
admin.createTable(htd);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -326,21 +341,23 @@ public class ImportTsv {
|
||||||
System.err.println("ERROR: " + errorMsg);
|
System.err.println("ERROR: " + errorMsg);
|
||||||
}
|
}
|
||||||
String usage =
|
String usage =
|
||||||
"Usage: " + NAME + " -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n" +
|
"Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
"Imports the given input directory of TSV data into the specified table.\n" +
|
"Imports the given input directory of TSV data into the specified table.\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
"The column names of the TSV data must be specified using the -Dimporttsv.columns\n" +
|
"The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" +
|
||||||
"option. This option takes the form of comma-separated column names, where each\n" +
|
"option. This option takes the form of comma-separated column names, where each\n" +
|
||||||
"column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
|
"column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
|
||||||
"column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
|
"column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" +
|
||||||
"as the row key for each imported record. You must specify exactly one column\n" +
|
"as the row key for each imported record. You must specify exactly one column\n" +
|
||||||
"to be the row key, and you must specify a column name for every column that exists in the\n" +
|
"to be the row key, and you must specify a column name for every column that exists in the\n" +
|
||||||
"input data. Another special column HBASE_TS_KEY designates that this column should be\n" +
|
"input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
|
||||||
"used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" +
|
" designates that this column should be\n" +
|
||||||
"You must specify atmost one column as timestamp key for each imported record.\n" +
|
"used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
|
||||||
|
TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional.\n" +
|
||||||
|
"You must specify at most one column as timestamp key for each imported record.\n" +
|
||||||
"Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
|
"Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
|
||||||
"Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" +
|
"Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
"By default importtsv will load data directly into HBase. To instead generate\n" +
|
"By default importtsv will load data directly into HBase. To instead generate\n" +
|
||||||
"HFiles of data to prepare for a bulk data load, pass the option:\n" +
|
"HFiles of data to prepare for a bulk data load, pass the option:\n" +
|
||||||
|
@ -351,7 +368,8 @@ public class ImportTsv {
|
||||||
" -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
|
" -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
|
||||||
" '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
|
" '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
|
||||||
" -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
|
" -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
|
||||||
" -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n" +
|
" -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
|
||||||
|
DEFAULT_MAPPER.getName() + "\n" +
|
||||||
"For performance consider the following options:\n" +
|
"For performance consider the following options:\n" +
|
||||||
" -Dmapred.map.tasks.speculative.execution=false\n" +
|
" -Dmapred.map.tasks.speculative.execution=false\n" +
|
||||||
" -Dmapred.reduce.tasks.speculative.execution=false";
|
" -Dmapred.reduce.tasks.speculative.execution=false";
|
||||||
|
@ -359,76 +377,71 @@ public class ImportTsv {
|
||||||
System.err.println(usage);
|
System.err.println(usage);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Used only by test method
|
public int run(String[] args) throws Exception {
|
||||||
* @param conf
|
setConf(HBaseConfiguration.create(getConf()));
|
||||||
*/
|
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
|
||||||
static void createHbaseAdmin(Configuration conf) throws IOException {
|
|
||||||
hbaseAdmin = new HBaseAdmin(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Main entry point.
|
|
||||||
*
|
|
||||||
* @param args The command line parameters.
|
|
||||||
* @throws Exception When running the job fails.
|
|
||||||
*/
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
|
|
||||||
if (otherArgs.length < 2) {
|
if (otherArgs.length < 2) {
|
||||||
usage("Wrong number of arguments: " + otherArgs.length);
|
usage("Wrong number of arguments: " + otherArgs.length);
|
||||||
System.exit(-1);
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure columns are specified
|
// When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so
|
||||||
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
|
// perform validation on these additional args. When it's not null, user has provided their
|
||||||
if (columns == null) {
|
// own mapper, thus these validation are not relevant.
|
||||||
usage("No columns specified. Please specify with -D" +
|
// TODO: validation for TsvImporterMapper, not this tool. Move elsewhere.
|
||||||
|
if (null == getConf().get(MAPPER_CONF_KEY)) {
|
||||||
|
// Make sure columns are specified
|
||||||
|
String columns[] = getConf().getStrings(COLUMNS_CONF_KEY);
|
||||||
|
if (columns == null) {
|
||||||
|
usage("No columns specified. Please specify with -D" +
|
||||||
COLUMNS_CONF_KEY+"=...");
|
COLUMNS_CONF_KEY+"=...");
|
||||||
System.exit(-1);
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure they specify exactly one column as the row key
|
// Make sure they specify exactly one column as the row key
|
||||||
int rowkeysFound=0;
|
int rowkeysFound = 0;
|
||||||
for (String col : columns) {
|
for (String col : columns) {
|
||||||
if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
|
if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
|
||||||
}
|
}
|
||||||
if (rowkeysFound != 1) {
|
if (rowkeysFound != 1) {
|
||||||
usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
|
usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
|
||||||
System.exit(-1);
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure we have at most one column as the timestamp key
|
// Make sure we have at most one column as the timestamp key
|
||||||
int tskeysFound = 0;
|
int tskeysFound = 0;
|
||||||
for (String col : columns) {
|
for (String col : columns) {
|
||||||
if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
|
if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
|
||||||
tskeysFound++;
|
tskeysFound++;
|
||||||
}
|
}
|
||||||
if (tskeysFound > 1) {
|
if (tskeysFound > 1) {
|
||||||
usage("Must specify at most one column as "
|
usage("Must specify at most one column as "
|
||||||
+ TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
|
+ TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
|
||||||
System.exit(-1);
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure one or more columns are specified excluding rowkey and
|
// Make sure one or more columns are specified excluding rowkey and
|
||||||
// timestamp key
|
// timestamp key
|
||||||
if (columns.length - (rowkeysFound + tskeysFound) < 1) {
|
if (columns.length - (rowkeysFound + tskeysFound) < 1) {
|
||||||
usage("One or more columns in addition to the row key and timestamp(optional) are required");
|
usage("One or more columns in addition to the row key and timestamp(optional) are required");
|
||||||
System.exit(-1);
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If timestamp option is not specified, use current system time.
|
// If timestamp option is not specified, use current system time.
|
||||||
long timstamp = conf
|
long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
|
||||||
.getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
|
|
||||||
|
|
||||||
// Set it back to replace invalid timestamp (non-numeric) with current
|
// Set it back to replace invalid timestamp (non-numeric) with current
|
||||||
// system time
|
// system time
|
||||||
conf.setLong(TIMESTAMP_CONF_KEY, timstamp);
|
getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
|
||||||
|
|
||||||
hbaseAdmin = new HBaseAdmin(conf);
|
Job job = createSubmittableJob(getConf(), otherArgs);
|
||||||
Job job = createSubmittableJob(conf, otherArgs);
|
return job.waitForCompletion(true) ? 0 : 1;
|
||||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
int status = ToolRunner.run(new ImportTsv(), args);
|
||||||
|
System.exit(status);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,38 +18,44 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.util.List;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.*;
|
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
public class TestImportTsv {
|
public class TestImportTsv {
|
||||||
|
@ -264,46 +270,39 @@ public class TestImportTsv {
|
||||||
htu1.startMiniCluster();
|
htu1.startMiniCluster();
|
||||||
htu1.startMiniMapReduceCluster();
|
htu1.startMiniMapReduceCluster();
|
||||||
|
|
||||||
GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args);
|
Tool tool = new ImportTsv();
|
||||||
Configuration conf = opts.getConfiguration();
|
tool.setConf(htu1.getConfiguration());
|
||||||
args = opts.getRemainingArgs();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
FileSystem fs = FileSystem.get(conf);
|
FileSystem fs = FileSystem.get(tool.getConf());
|
||||||
FSDataOutputStream op = fs.create(new Path(inputFile), true);
|
FSDataOutputStream op = fs.create(new Path(inputFile), true);
|
||||||
if (data == null) {
|
if (data == null) {
|
||||||
data = "KEY\u001bVALUE1\u001bVALUE2\n";
|
data = "KEY\u001bVALUE1\u001bVALUE2\n";
|
||||||
}
|
}
|
||||||
op.write(Bytes.toBytes(data));
|
op.write(Bytes.toBytes(data));
|
||||||
op.close();
|
op.close();
|
||||||
|
LOG.debug(String.format("Wrote test data to file: %s", fs.makeQualified(new Path(inputFile))));
|
||||||
|
|
||||||
final byte[] FAM = Bytes.toBytes(family);
|
if (tool.getConf().get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
|
||||||
final byte[] TAB = Bytes.toBytes(tableName);
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
|
desc.addFamily(new HColumnDescriptor(family));
|
||||||
HTableDescriptor desc = new HTableDescriptor(TAB);
|
HBaseAdmin admin = new HBaseAdmin(tool.getConf());
|
||||||
desc.addFamily(new HColumnDescriptor(FAM));
|
|
||||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
|
||||||
admin.createTable(desc);
|
admin.createTable(desc);
|
||||||
admin.close();
|
admin.close();
|
||||||
} else { // set the hbaseAdmin as we are not going through main()
|
|
||||||
LOG.info("set the hbaseAdmin");
|
|
||||||
ImportTsv.createHbaseAdmin(conf);
|
|
||||||
}
|
}
|
||||||
// force use of combiner for testing purposes
|
// force use of combiner for testing purposes
|
||||||
conf.setInt("min.num.spills.for.combine", 1);
|
tool.getConf().setInt("min.num.spills.for.combine", 1);
|
||||||
Job job = ImportTsv.createSubmittableJob(conf, args);
|
assertEquals(0, ToolRunner.run(tool, args));
|
||||||
job.waitForCompletion(false);
|
|
||||||
assertTrue(job.isSuccessful());
|
|
||||||
|
|
||||||
HTable table = new HTable(new Configuration(conf), TAB);
|
HTable table = new HTable(tool.getConf(), tableName);
|
||||||
boolean verified = false;
|
boolean verified = false;
|
||||||
long pause = conf.getLong("hbase.client.pause", 5 * 1000);
|
long pause = tool.getConf().getLong("hbase.client.pause", 5 * 1000);
|
||||||
int numRetries = conf.getInt("hbase.client.retries.number", 5);
|
int numRetries = tool.getConf().getInt("hbase.client.retries.number", 5);
|
||||||
for (int i = 0; i < numRetries; i++) {
|
for (int i = 0; i < numRetries; i++) {
|
||||||
try {
|
try {
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
// Scan entire family.
|
// Scan entire family.
|
||||||
scan.addFamily(FAM);
|
scan.addFamily(Bytes.toBytes(family));
|
||||||
ResultScanner resScanner = table.getScanner(scan);
|
ResultScanner resScanner = table.getScanner(scan);
|
||||||
for (Result res : resScanner) {
|
for (Result res : resScanner) {
|
||||||
assertTrue(res.size() == 2);
|
assertTrue(res.size() == 2);
|
||||||
|
|
Loading…
Reference in New Issue