HBASE-20769 getSplits() has a out of bounds problem in TableSnapshotInputFormatImpl
Signed-off-by: huzheng <openinx@gmail.com>
This commit is contained in:
parent
56a86a8081
commit
36b49c5e17
|
@ -137,6 +137,10 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
|||
public RegionInfo getRegion() {
|
||||
return delegate.getRegionInfo();
|
||||
}
|
||||
|
||||
TableSnapshotInputFormatImpl.InputSplit getDelegate() {
|
||||
return this.delegate;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -381,8 +381,19 @@ public class TableSnapshotInputFormatImpl {
|
|||
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
|
||||
|
||||
Scan boundedScan = new Scan(scan);
|
||||
boundedScan.setStartRow(sp[i]);
|
||||
boundedScan.setStopRow(sp[i + 1]);
|
||||
if (scan.getStartRow().length == 0) {
|
||||
boundedScan.withStartRow(sp[i]);
|
||||
} else {
|
||||
boundedScan.withStartRow(
|
||||
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
|
||||
}
|
||||
|
||||
if (scan.getStopRow().length == 0) {
|
||||
boundedScan.withStopRow(sp[i + 1]);
|
||||
} else {
|
||||
boundedScan.withStopRow(
|
||||
Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
|
||||
}
|
||||
|
||||
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
|
||||
}
|
||||
|
|
|
@ -76,6 +76,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
|||
|
||||
private static final byte[] bbb = Bytes.toBytes("bbb");
|
||||
private static final byte[] yyy = Bytes.toBytes("yyy");
|
||||
private static final byte[] bbc = Bytes.toBytes("bbc");
|
||||
private static final byte[] yya = Bytes.toBytes("yya");
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
@ -246,6 +248,62 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
|
||||
setupCluster();
|
||||
String snapshotName = "testWithMockedMapReduceMultiRegion";
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
try {
|
||||
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
|
||||
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
|
||||
Job job = new Job(conf);
|
||||
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
// test scan with startRow and stopRow
|
||||
Scan scan = new Scan(bbc, yya);
|
||||
|
||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
|
||||
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
|
||||
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
|
||||
|
||||
verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
|
||||
} finally {
|
||||
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
||||
UTIL.deleteTable(tableName);
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
|
||||
setupCluster();
|
||||
String snapshotName = "testWithMockedMapReduceMultiRegion";
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
try {
|
||||
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
|
||||
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
|
||||
Job job = new Job(conf);
|
||||
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
// test scan without startRow and stopRow
|
||||
Scan scan2 = new Scan();
|
||||
|
||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
|
||||
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
|
||||
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
|
||||
|
||||
verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
|
||||
HConstants.EMPTY_START_ROW);
|
||||
|
||||
} finally {
|
||||
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
||||
UTIL.deleteTable(tableName);
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoDuplicateResultsWhenSplitting() throws Exception {
|
||||
setupCluster();
|
||||
|
@ -306,8 +364,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
|||
|
||||
Assert.assertEquals(expectedNumSplits, splits.size());
|
||||
|
||||
HBaseTestingUtility.SeenRowTracker rowTracker =
|
||||
new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
|
||||
HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow,
|
||||
stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
|
||||
|
||||
boolean localityEnabled =
|
||||
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
|
||||
|
@ -317,12 +375,28 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
|||
// validate input split
|
||||
InputSplit split = splits.get(i);
|
||||
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
|
||||
TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
|
||||
if (localityEnabled) {
|
||||
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
|
||||
} else {
|
||||
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
|
||||
}
|
||||
|
||||
Scan scan =
|
||||
TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
|
||||
if (startRow.length > 0) {
|
||||
Assert.assertTrue(
|
||||
Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
|
||||
Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
|
||||
}
|
||||
if (stopRow.length > 0) {
|
||||
Assert.assertTrue(
|
||||
Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
|
||||
Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
|
||||
}
|
||||
Assert.assertTrue("startRow should < stopRow",
|
||||
Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
|
||||
|
||||
// validate record reader
|
||||
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
|
||||
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
|
||||
|
|
Loading…
Reference in New Issue