HBASE-20769 getSplits() has a out of bounds problem in TableSnapshotInputFormatImpl
This commit is contained in:
parent
a1b297e673
commit
48cd82e134
|
@ -137,6 +137,10 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
||||||
public RegionInfo getRegion() {
|
public RegionInfo getRegion() {
|
||||||
return delegate.getRegionInfo();
|
return delegate.getRegionInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TableSnapshotInputFormatImpl.InputSplit getDelegate() {
|
||||||
|
return this.delegate;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -381,8 +381,19 @@ public class TableSnapshotInputFormatImpl {
|
||||||
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
|
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
|
||||||
|
|
||||||
Scan boundedScan = new Scan(scan);
|
Scan boundedScan = new Scan(scan);
|
||||||
boundedScan.setStartRow(sp[i]);
|
if (scan.getStartRow().length == 0) {
|
||||||
boundedScan.setStopRow(sp[i + 1]);
|
boundedScan.withStartRow(sp[i]);
|
||||||
|
} else {
|
||||||
|
boundedScan.withStartRow(
|
||||||
|
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (scan.getStopRow().length == 0) {
|
||||||
|
boundedScan.withStopRow(sp[i + 1]);
|
||||||
|
} else {
|
||||||
|
boundedScan.withStopRow(
|
||||||
|
Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
|
||||||
|
}
|
||||||
|
|
||||||
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
|
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
|
|
||||||
private static final byte[] bbb = Bytes.toBytes("bbb");
|
private static final byte[] bbb = Bytes.toBytes("bbb");
|
||||||
private static final byte[] yyy = Bytes.toBytes("yyy");
|
private static final byte[] yyy = Bytes.toBytes("yyy");
|
||||||
|
private static final byte[] bbc = Bytes.toBytes("bbc");
|
||||||
|
private static final byte[] yya = Bytes.toBytes("yya");
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
@ -246,6 +248,62 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
|
||||||
|
setupCluster();
|
||||||
|
String snapshotName = "testWithMockedMapReduceMultiRegion";
|
||||||
|
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
try {
|
||||||
|
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
|
||||||
|
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
|
||||||
|
Job job = new Job(conf);
|
||||||
|
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||||
|
// test scan with startRow and stopRow
|
||||||
|
Scan scan = new Scan(bbc, yya);
|
||||||
|
|
||||||
|
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
|
||||||
|
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
|
||||||
|
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
|
||||||
|
|
||||||
|
verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
|
||||||
|
} finally {
|
||||||
|
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
||||||
|
UTIL.deleteTable(tableName);
|
||||||
|
tearDownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
|
||||||
|
setupCluster();
|
||||||
|
String snapshotName = "testWithMockedMapReduceMultiRegion";
|
||||||
|
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
try {
|
||||||
|
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
|
||||||
|
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
|
||||||
|
Job job = new Job(conf);
|
||||||
|
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||||
|
// test scan without startRow and stopRow
|
||||||
|
Scan scan2 = new Scan();
|
||||||
|
|
||||||
|
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
|
||||||
|
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
|
||||||
|
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
|
||||||
|
|
||||||
|
verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
|
||||||
|
HConstants.EMPTY_START_ROW);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
||||||
|
UTIL.deleteTable(tableName);
|
||||||
|
tearDownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoDuplicateResultsWhenSplitting() throws Exception {
|
public void testNoDuplicateResultsWhenSplitting() throws Exception {
|
||||||
setupCluster();
|
setupCluster();
|
||||||
|
@ -306,8 +364,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
|
|
||||||
Assert.assertEquals(expectedNumSplits, splits.size());
|
Assert.assertEquals(expectedNumSplits, splits.size());
|
||||||
|
|
||||||
HBaseTestingUtility.SeenRowTracker rowTracker =
|
HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow,
|
||||||
new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
|
stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
|
||||||
|
|
||||||
boolean localityEnabled =
|
boolean localityEnabled =
|
||||||
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
|
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
|
||||||
|
@ -317,12 +375,28 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
// validate input split
|
// validate input split
|
||||||
InputSplit split = splits.get(i);
|
InputSplit split = splits.get(i);
|
||||||
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
|
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
|
||||||
|
TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
|
||||||
if (localityEnabled) {
|
if (localityEnabled) {
|
||||||
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
|
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
|
||||||
} else {
|
} else {
|
||||||
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
|
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Scan scan =
|
||||||
|
TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
|
||||||
|
if (startRow.length > 0) {
|
||||||
|
Assert.assertTrue(
|
||||||
|
Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
|
||||||
|
Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
|
||||||
|
}
|
||||||
|
if (stopRow.length > 0) {
|
||||||
|
Assert.assertTrue(
|
||||||
|
Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
|
||||||
|
Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
|
||||||
|
}
|
||||||
|
Assert.assertTrue("startRow should < stopRow",
|
||||||
|
Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
|
||||||
|
|
||||||
// validate record reader
|
// validate record reader
|
||||||
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
|
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
|
||||||
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
|
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
|
||||||
|
|
Loading…
Reference in New Issue