HBASE-15432 TableInputFormat - support multi column family scan (Xuesen Liang)

This commit is contained in:
tedyu 2016-12-16 18:04:55 -08:00
parent 49b0bab504
commit 2c107e4d08
6 changed files with 46 additions and 30 deletions

View File

@ -305,7 +305,7 @@ public class CellCounter extends Configured implements Tool {
System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>");
System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\"");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
System.err.println(" -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>");
System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>");
System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>");

View File

@ -110,9 +110,8 @@ public class Export extends Configured implements Tool {
if (raw) {
s.setRaw(raw);
}
if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
s.addFamily(Bytes.toBytes(columnFamily));
}
// Set RowFilter or Prefix Filter if applicable.
Filter exportFilter = getExportFilter(args);
@ -163,7 +162,7 @@ public class Export extends Configured implements Tool {
System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK");
System.err.println(" Additionally, the following SCAN properties can be specified");
System.err.println(" to control/limit what is exported..");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
System.err.println(" -D " + RAW_SCAN + "=true");
System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");

View File

@ -161,8 +161,8 @@ implements Configurable {
addColumns(scan, conf.get(SCAN_COLUMNS));
}
if (conf.get(SCAN_COLUMN_FAMILY) != null) {
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) {
scan.addFamily(Bytes.toBytes(columnFamily));
}
if (conf.get(SCAN_TIMESTAMP) != null) {

View File

@ -310,7 +310,7 @@ public class TestCellCounter {
/**
* Test CellCounter for complete table all data should print to output
*/
@Test(timeout = 300000)
@Test(timeout = 600000)
public void testCellCounterForCompleteTable() throws Exception {
TableName sourceTable = TableName.valueOf("testCellCounterForCompleteTable");
String outputPath = OUTPUT_DIR + sourceTable;
@ -346,8 +346,18 @@ public class TestCellCounter {
assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1"));
assertTrue(data.contains("row2;a;q_Versions" + "\t" + "1"));
assertTrue(data.contains("row2;b;q_Versions" + "\t" + "1"));
FileUtil.fullyDelete(new File(outputPath));
args = new String[] { "-D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=a, b",
sourceTable.getNameAsString(), outputDir.toString(), ";"};
runCount(args);
inputStream = new FileInputStream(outputPath + File.separator + "part-r-00000");
String data2 = IOUtils.toString(inputStream);
inputStream.close();
assertEquals(data, data2);
} finally {
t.close();
localFileSystem.close();
FileUtil.fullyDelete(new File(outputPath));
}
}

View File

@ -572,16 +572,18 @@ public class TestImportExport {
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("Wrong number of arguments:"));
assertTrue(data.toString().contains(
String errMsg = data.toString();
assertTrue(errMsg.contains("Wrong number of arguments:"));
assertTrue(errMsg.contains(
"Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
"[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=<familyName>"));
assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true"));
assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
assertTrue(data.toString().contains("-Dhbase.export.scanner.batch=10"));
assertTrue(
errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
assertTrue(errMsg.contains("-Dhbase.client.scanner.caching=100"));
assertTrue(errMsg.contains("-Dmapreduce.map.speculative=false"));
assertTrue(errMsg.contains("-Dmapreduce.reduce.speculative=false"));
assertTrue(errMsg.contains("-Dhbase.export.scanner.batch=10"));
} finally {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);

View File

@ -65,7 +65,7 @@ public abstract class TestTableInputFormatScanBase {
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
static final TableName TABLE_NAME = TableName.valueOf("scantest");
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
static final String KEY_STARTROW = "startRow";
static final String KEY_LASTROW = "stpRow";
@ -83,8 +83,8 @@ public abstract class TestTableInputFormatScanBase {
// start mini hbase cluster
TEST_UTIL.startMiniCluster(3);
// create and fill table
table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILY);
TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
}
@AfterClass
@ -110,21 +110,23 @@ public abstract class TestTableInputFormatScanBase {
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
if (value.size() != 1) {
throw new IOException("There should only be one input column");
if (value.size() != 2) {
throw new IOException("There should be two input columns");
}
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
cf = value.getMap();
if(!cf.containsKey(INPUT_FAMILY)) {
cfMap = value.getMap();
if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
throw new IOException("Wrong input columns. Missing: '" +
Bytes.toString(INPUT_FAMILY) + "'.");
Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
}
String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
", value -> " + val);
", value -> (" + val0 + ", " + val1 + ")");
context.write(key, key);
}
}
/**
@ -181,7 +183,8 @@ public abstract class TestTableInputFormatScanBase {
"To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILYS[0]) + ", "
+ Bytes.toString(INPUT_FAMILYS[1]));
c.set(KEY_STARTROW, start != null ? start : "");
c.set(KEY_LASTROW, last != null ? last : "");
@ -219,7 +222,8 @@ public abstract class TestTableInputFormatScanBase {
LOG.info("Before map/reduce startup - job " + jobName);
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY);
scan.addFamily(INPUT_FAMILYS[0]);
scan.addFamily(INPUT_FAMILYS[1]);
if (start != null) {
scan.setStartRow(Bytes.toBytes(start));
}
@ -256,7 +260,8 @@ public abstract class TestTableInputFormatScanBase {
LOG.info("Before map/reduce startup - job " + jobName);
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY);
scan.addFamily(INPUT_FAMILYS[0]);
scan.addFamily(INPUT_FAMILYS[1]);
c.set("hbase.mapreduce.input.autobalance", "true");
c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
c.set(KEY_STARTROW, "");