HBASE-24387 TableSnapshotInputFormatImpl support row limit on each InputSplit (#1731)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
niuyulin 2020-05-24 20:55:48 -05:00 committed by Guanghao Zhang
parent 58eed9a4bb
commit 61eb7e5003
2 changed files with 69 additions and 5 deletions

View File

@ -101,6 +101,12 @@ public class TableSnapshotInputFormatImpl {
"hbase.TableSnapshotInputFormat.locality.enabled"; "hbase.TableSnapshotInputFormat.locality.enabled";
public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true; public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
/**
* In some scenario, scan limited rows on each InputSplit for sampling data extraction
*/
public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT =
"hbase.TableSnapshotInputFormat.row.limit.per.inputsplit";
/** /**
* Implementation class for InputSplit logic common between mapred and mapreduce. * Implementation class for InputSplit logic common between mapred and mapreduce.
*/ */
@ -213,6 +219,8 @@ public class TableSnapshotInputFormatImpl {
private Result result = null; private Result result = null;
private ImmutableBytesWritable row = null; private ImmutableBytesWritable row = null;
private ClientSideRegionScanner scanner; private ClientSideRegionScanner scanner;
private int numOfCompleteRows = 0;
private int rowLimitPerSplit;
public ClientSideRegionScanner getScanner() { public ClientSideRegionScanner getScanner() {
return scanner; return scanner;
@ -221,6 +229,7 @@ public class TableSnapshotInputFormatImpl {
public void initialize(InputSplit split, Configuration conf) throws IOException { public void initialize(InputSplit split, Configuration conf) throws IOException {
this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
this.split = split; this.split = split;
this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
TableDescriptor htd = split.htd; TableDescriptor htd = split.htd;
HRegionInfo hri = this.split.getRegionInfo(); HRegionInfo hri = this.split.getRegionInfo();
FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf); FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
@ -244,6 +253,9 @@ public class TableSnapshotInputFormatImpl {
return false; return false;
} }
if (rowLimitPerSplit > 0 && ++this.numOfCompleteRows > rowLimitPerSplit) {
return false;
}
if (this.row == null) { if (this.row == null) {
this.row = new ImmutableBytesWritable(); this.row = new ImmutableBytesWritable();
} }
@ -296,10 +308,11 @@ public class TableSnapshotInputFormatImpl {
return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits); return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
} }
public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{ public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException {
String splitAlgoClassName = conf.get(SPLIT_ALGO); String splitAlgoClassName = conf.get(SPLIT_ALGO);
if (splitAlgoClassName == null) if (splitAlgoClassName == null) {
return null; return null;
}
try { try {
return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class) return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class)
.getDeclaredConstructor().newInstance(); .getDeclaredConstructor().newInstance();
@ -511,9 +524,9 @@ public class TableSnapshotInputFormatImpl {
* Configures the job to use TableSnapshotInputFormat to read from a snapshot. * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param conf the job to configure * @param conf the job to configure
* @param snapshotName the name of the snapshot to read from * @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should * @param restoreDir a temporary directory to restore the snapshot into. Current user should have
* have write permissions to this directory, and this should not be a subdirectory of rootdir. * write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted. * After the job is finished, restoreDir can be deleted.
* @param numSplitsPerRegion how many input splits to generate per one region * @param numSplitsPerRegion how many input splits to generate per one region
* @param splitAlgo SplitAlgorithm to be used when generating InputSplits * @param splitAlgo SplitAlgorithm to be used when generating InputSplits
* @throws IOException if an error occurs * @throws IOException if an error occurs

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -304,6 +305,56 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
} }
} }
@Test
public void testScanLimit() throws Exception {
setupCluster();
final TableName tableName = TableName.valueOf(name.getMethodName());
final String snapshotName = tableName + "Snapshot";
Table table = null;
try {
UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
if (UTIL.getAdmin().tableExists(tableName)) {
UTIL.deleteTable(tableName);
}
UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
Admin admin = UTIL.getAdmin();
int regionNum = admin.getRegions(tableName).size();
// put some stuff in the table
table = UTIL.getConnection().getTable(tableName);
UTIL.loadTable(table, FAMILIES);
Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
null, snapshotName, rootDir, fs, true);
Job job = new Job(UTIL.getConfiguration());
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan();
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
TestTableSnapshotInputFormat.class);
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
tmpTableDir);
Assert.assertTrue(job.waitForCompletion(true));
Assert.assertEquals(10 * regionNum,
job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
} finally {
if (table != null) {
table.close();
}
UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT);
UTIL.getAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
tearDownCluster();
}
}
@Test @Test
public void testNoDuplicateResultsWhenSplitting() throws Exception { public void testNoDuplicateResultsWhenSplitting() throws Exception {
setupCluster(); setupCluster();