HBASE-5741 ImportTsv does not check for table existence (Himanshu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1327338 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
421400c256
commit
34d88b8e23
|
@ -23,13 +23,18 @@ import org.apache.hadoop.hbase.util.Base64;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
|
@ -65,6 +70,7 @@ public class ImportTsv {
|
|||
final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
|
||||
final static String DEFAULT_SEPARATOR = "\t";
|
||||
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
|
||||
private static HBaseAdmin hbaseAdmin;
|
||||
|
||||
static class TsvParser {
|
||||
/**
|
||||
|
@ -221,6 +227,9 @@ public class ImportTsv {
|
|||
|
||||
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
||||
if (hfileOutPath != null) {
|
||||
if (!doesTableExist(tableName)) {
|
||||
createTable(conf, tableName);
|
||||
}
|
||||
HTable table = new HTable(conf, tableName);
|
||||
job.setReducerClass(PutSortReducer.class);
|
||||
Path outputDir = new Path(hfileOutPath);
|
||||
|
@ -241,6 +250,27 @@ public class ImportTsv {
|
|||
return job;
|
||||
}
|
||||
|
||||
private static boolean doesTableExist(String tableName) throws IOException {
|
||||
return hbaseAdmin.tableExists(tableName.getBytes());
|
||||
}
|
||||
|
||||
private static void createTable(Configuration conf, String tableName)
|
||||
throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());
|
||||
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
|
||||
Set<String> cfSet = new HashSet<String>();
|
||||
for (String aColumn : columns) {
|
||||
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)) continue;
|
||||
// we are only concerned with the first one (in case this is a cf:cq)
|
||||
cfSet.add(aColumn.split(":", 2)[0]);
|
||||
}
|
||||
for (String cf : cfSet) {
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
|
||||
htd.addFamily(hcd);
|
||||
}
|
||||
hbaseAdmin.createTable(htd);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param errorMsg Error message. Can be null.
|
||||
*/
|
||||
|
@ -278,6 +308,14 @@ public class ImportTsv {
|
|||
System.err.println(usage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used only by test method
|
||||
* @param conf
|
||||
*/
|
||||
static void createHbaseAdmin(Configuration conf) throws IOException {
|
||||
hbaseAdmin = new HBaseAdmin(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Main entry point.
|
||||
*
|
||||
|
@ -315,7 +353,7 @@ public class ImportTsv {
|
|||
usage("One or more columns in addition to the row key are required");
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
hbaseAdmin = new HBaseAdmin(conf);
|
||||
Job job = createSubmittableJob(conf, otherArgs);
|
||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
||||
}
|
||||
|
|
|
@ -204,11 +204,13 @@ public class TestImportTsv {
|
|||
final byte[] TAB = Bytes.toBytes(tableName);
|
||||
final byte[] QA = Bytes.toBytes("A");
|
||||
final byte[] QB = Bytes.toBytes("B");
|
||||
|
||||
HTableDescriptor desc = new HTableDescriptor(TAB);
|
||||
desc.addFamily(new HColumnDescriptor(FAM));
|
||||
new HBaseAdmin(conf).createTable(desc);
|
||||
|
||||
if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
|
||||
HTableDescriptor desc = new HTableDescriptor(TAB);
|
||||
desc.addFamily(new HColumnDescriptor(FAM));
|
||||
new HBaseAdmin(conf).createTable(desc);
|
||||
} else { // set the hbaseAdmin as we are not going through main()
|
||||
ImportTsv.createHbaseAdmin(conf);
|
||||
}
|
||||
Job job = ImportTsv.createSubmittableJob(conf, args);
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
|
@ -255,6 +257,21 @@ public class TestImportTsv {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkOutputWithoutAnExistingTable() throws Exception {
|
||||
String TABLE_NAME = "TestTable";
|
||||
String FAMILY = "FAM";
|
||||
String INPUT_FILE = "InputFile2.esv";
|
||||
|
||||
// Prepare the arguments required for the test.
|
||||
String[] args = new String[] {
|
||||
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
|
||||
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
|
||||
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME,
|
||||
INPUT_FILE };
|
||||
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
|
||||
}
|
||||
|
||||
public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
|
||||
return new String(bytes, HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue