HBASE-20769 getSplits() has a out of bounds problem in TableSnapshotInputFormatImpl

This commit is contained in:
jingyuntian 2018-06-26 11:23:43 +08:00 committed by huzheng
parent 4ba6242a62
commit b41b05cd6b
3 changed files with 93 additions and 4 deletions

View File

@ -137,6 +137,10 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
public RegionInfo getRegion() {
return delegate.getRegionInfo();
}
TableSnapshotInputFormatImpl.InputSplit getDelegate() {
return this.delegate;
}
}
@VisibleForTesting

View File

@ -381,8 +381,19 @@ public class TableSnapshotInputFormatImpl {
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
Scan boundedScan = new Scan(scan);
boundedScan.setStartRow(sp[i]);
boundedScan.setStopRow(sp[i + 1]);
if (scan.getStartRow().length == 0) {
boundedScan.withStartRow(sp[i]);
} else {
boundedScan.withStartRow(
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
}
if (scan.getStopRow().length == 0) {
boundedScan.withStopRow(sp[i + 1]);
} else {
boundedScan.withStopRow(
Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
}
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
}

View File

@ -76,6 +76,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
private static final byte[] bbb = Bytes.toBytes("bbb");
private static final byte[] yyy = Bytes.toBytes("yyy");
private static final byte[] bbc = Bytes.toBytes("bbc");
private static final byte[] yya = Bytes.toBytes("yya");
@Rule
public TestName name = new TestName();
@ -246,6 +248,62 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
}
}
@Test
public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
setupCluster();
String snapshotName = "testWithMockedMapReduceMultiRegion";
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
Job job = new Job(conf);
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
// test scan with startRow and stopRow
Scan scan = new Scan(bbc, yya);
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
} finally {
UTIL.getAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
tearDownCluster();
}
}
@Test
public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
setupCluster();
String snapshotName = "testWithMockedMapReduceMultiRegion";
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
Job job = new Job(conf);
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
// test scan without startRow and stopRow
Scan scan2 = new Scan();
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_START_ROW);
} finally {
UTIL.getAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
tearDownCluster();
}
}
@Test
public void testNoDuplicateResultsWhenSplitting() throws Exception {
setupCluster();
@ -306,8 +364,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
Assert.assertEquals(expectedNumSplits, splits.size());
HBaseTestingUtility.SeenRowTracker rowTracker =
new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow,
stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
boolean localityEnabled =
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
@ -317,12 +375,28 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
// validate input split
InputSplit split = splits.get(i);
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
if (localityEnabled) {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
} else {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
}
Scan scan =
TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
if (startRow.length > 0) {
Assert.assertTrue(
Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
}
if (stopRow.length > 0) {
Assert.assertTrue(
Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
}
Assert.assertTrue("startRow should < stopRow",
Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
// validate record reader
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());