HBASE-10536 ImportTsv should fail fast if any of the column family passed to the job is not present in the table (denny joesph)
This commit is contained in:
parent
0f8894cd64
commit
eb4c194a87
|
@ -43,10 +43,8 @@ import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.util.Base64;
|
import org.apache.hadoop.hbase.util.Base64;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -99,6 +97,7 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
|
final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
|
||||||
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
|
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
|
||||||
public final static String CREATE_TABLE_CONF_KEY = "create.table";
|
public final static String CREATE_TABLE_CONF_KEY = "create.table";
|
||||||
|
public final static String NO_STRICT_COL_FAMILY = "no.strict";
|
||||||
|
|
||||||
public static class TsvParser {
|
public static class TsvParser {
|
||||||
/**
|
/**
|
||||||
|
@ -450,6 +449,32 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try (HTable table = (HTable)connection.getTable(tableName)) {
|
try (HTable table = (HTable)connection.getTable(tableName)) {
|
||||||
|
boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
|
||||||
|
// if no.strict is false then check column family
|
||||||
|
if(!noStrict) {
|
||||||
|
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
|
||||||
|
Set<String> cfSet = getColumnFamilies(columns);
|
||||||
|
HTableDescriptor tDesc = table.getTableDescriptor();
|
||||||
|
for (String cf : cfSet) {
|
||||||
|
if(tDesc.getFamily(Bytes.toBytes(cf)) == null) {
|
||||||
|
unmatchedFamilies.add(cf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(unmatchedFamilies.size() > 0) {
|
||||||
|
ArrayList<String> familyNames = new ArrayList<String>();
|
||||||
|
for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) {
|
||||||
|
familyNames.add(family.getNameAsString());
|
||||||
|
}
|
||||||
|
String msg =
|
||||||
|
"Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
|
||||||
|
+ " does not match with any of the table " + tableName
|
||||||
|
+ " column families " + familyNames + ".\n"
|
||||||
|
+ "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
|
||||||
|
+ "=true.\n";
|
||||||
|
usage(msg);
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
job.setReducerClass(PutSortReducer.class);
|
job.setReducerClass(PutSortReducer.class);
|
||||||
Path outputDir = new Path(hfileOutPath);
|
Path outputDir = new Path(hfileOutPath);
|
||||||
FileOutputFormat.setOutputPath(job, outputDir);
|
FileOutputFormat.setOutputPath(job, outputDir);
|
||||||
|
@ -494,6 +519,17 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
private static void createTable(Admin admin, TableName tableName, String[] columns)
|
private static void createTable(Admin admin, TableName tableName, String[] columns)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
Set<String> cfSet = getColumnFamilies(columns);
|
||||||
|
for (String cf : cfSet) {
|
||||||
|
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
|
||||||
|
htd.addFamily(hcd);
|
||||||
|
}
|
||||||
|
LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
|
||||||
|
tableName, cfSet));
|
||||||
|
admin.createTable(htd);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<String> getColumnFamilies(String[] columns) {
|
||||||
Set<String> cfSet = new HashSet<String>();
|
Set<String> cfSet = new HashSet<String>();
|
||||||
for (String aColumn : columns) {
|
for (String aColumn : columns) {
|
||||||
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
|
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
|
||||||
|
@ -504,13 +540,7 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
// we are only concerned with the first one (in case this is a cf:cq)
|
// we are only concerned with the first one (in case this is a cf:cq)
|
||||||
cfSet.add(aColumn.split(":", 2)[0]);
|
cfSet.add(aColumn.split(":", 2)[0]);
|
||||||
}
|
}
|
||||||
for (String cf : cfSet) {
|
return cfSet;
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
|
|
||||||
htd.addFamily(hcd);
|
|
||||||
}
|
|
||||||
LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
|
|
||||||
tableName, cfSet));
|
|
||||||
admin.createTable(htd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -556,7 +586,8 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
" -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
|
" -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
|
||||||
" -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
|
" -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
|
||||||
" Note: if you set this to 'no', then the target table must already exist in HBase\n" +
|
" Note: if you set this to 'no', then the target table must already exist in HBase\n" +
|
||||||
"\n" +
|
" -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " +
|
||||||
|
"Default is false\n\n" +
|
||||||
"For performance consider the following options:\n" +
|
"For performance consider the following options:\n" +
|
||||||
" -Dmapreduce.map.speculative=false\n" +
|
" -Dmapreduce.map.speculative=false\n" +
|
||||||
" -Dmapreduce.reduce.speculative=false";
|
" -Dmapreduce.reduce.speculative=false";
|
||||||
|
|
|
@ -187,6 +187,23 @@ public class TestImportTsv implements Configurable {
|
||||||
doMROnTableTest(util, FAMILY, null, args, 3);
|
doMROnTableTest(util, FAMILY, null, args, 3);
|
||||||
util.deleteTable(table);
|
util.deleteTable(table);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
|
||||||
|
String table = "test-" + UUID.randomUUID();
|
||||||
|
// Prepare the arguments required for the test.
|
||||||
|
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
|
||||||
|
String[] args = new String[] {
|
||||||
|
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
|
||||||
|
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
|
||||||
|
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
|
||||||
|
"-D" + ImportTsv.NO_STRICT_COL_FAMILY + "=true",
|
||||||
|
table
|
||||||
|
};
|
||||||
|
util.createTable(TableName.valueOf(table), FAMILY);
|
||||||
|
doMROnTableTest(util, FAMILY, null, args, 3);
|
||||||
|
util.deleteTable(table);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
|
public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue