Revert "HBASE-20769 getSplits() has a out of bounds problem in TableSnapshotInputFormatImpl"
This reverts commit 48cd82e134
.
This commit is contained in:
parent
73601293fc
commit
2c02fb66a0
|
@ -137,10 +137,6 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
||||||
public RegionInfo getRegion() {
|
public RegionInfo getRegion() {
|
||||||
return delegate.getRegionInfo();
|
return delegate.getRegionInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
TableSnapshotInputFormatImpl.InputSplit getDelegate() {
|
|
||||||
return this.delegate;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -381,19 +381,8 @@ public class TableSnapshotInputFormatImpl {
|
||||||
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
|
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
|
||||||
|
|
||||||
Scan boundedScan = new Scan(scan);
|
Scan boundedScan = new Scan(scan);
|
||||||
if (scan.getStartRow().length == 0) {
|
boundedScan.setStartRow(sp[i]);
|
||||||
boundedScan.withStartRow(sp[i]);
|
boundedScan.setStopRow(sp[i + 1]);
|
||||||
} else {
|
|
||||||
boundedScan.withStartRow(
|
|
||||||
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (scan.getStopRow().length == 0) {
|
|
||||||
boundedScan.withStopRow(sp[i + 1]);
|
|
||||||
} else {
|
|
||||||
boundedScan.withStopRow(
|
|
||||||
Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
|
|
||||||
}
|
|
||||||
|
|
||||||
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
|
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,8 +76,6 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
|
|
||||||
private static final byte[] bbb = Bytes.toBytes("bbb");
|
private static final byte[] bbb = Bytes.toBytes("bbb");
|
||||||
private static final byte[] yyy = Bytes.toBytes("yyy");
|
private static final byte[] yyy = Bytes.toBytes("yyy");
|
||||||
private static final byte[] bbc = Bytes.toBytes("bbc");
|
|
||||||
private static final byte[] yya = Bytes.toBytes("yya");
|
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
@ -248,62 +246,6 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
|
|
||||||
setupCluster();
|
|
||||||
String snapshotName = "testWithMockedMapReduceMultiRegion";
|
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
|
||||||
try {
|
|
||||||
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
|
|
||||||
|
|
||||||
Configuration conf = UTIL.getConfiguration();
|
|
||||||
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
|
|
||||||
Job job = new Job(conf);
|
|
||||||
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
|
||||||
// test scan with startRow and stopRow
|
|
||||||
Scan scan = new Scan(bbc, yya);
|
|
||||||
|
|
||||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
|
|
||||||
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
|
|
||||||
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
|
|
||||||
|
|
||||||
verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
|
|
||||||
} finally {
|
|
||||||
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
|
||||||
UTIL.deleteTable(tableName);
|
|
||||||
tearDownCluster();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
|
|
||||||
setupCluster();
|
|
||||||
String snapshotName = "testWithMockedMapReduceMultiRegion";
|
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
|
||||||
try {
|
|
||||||
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
|
|
||||||
|
|
||||||
Configuration conf = UTIL.getConfiguration();
|
|
||||||
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
|
|
||||||
Job job = new Job(conf);
|
|
||||||
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
|
||||||
// test scan without startRow and stopRow
|
|
||||||
Scan scan2 = new Scan();
|
|
||||||
|
|
||||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
|
|
||||||
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
|
|
||||||
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
|
|
||||||
|
|
||||||
verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
|
|
||||||
HConstants.EMPTY_START_ROW);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
|
||||||
UTIL.deleteTable(tableName);
|
|
||||||
tearDownCluster();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoDuplicateResultsWhenSplitting() throws Exception {
|
public void testNoDuplicateResultsWhenSplitting() throws Exception {
|
||||||
setupCluster();
|
setupCluster();
|
||||||
|
@ -364,8 +306,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
|
|
||||||
Assert.assertEquals(expectedNumSplits, splits.size());
|
Assert.assertEquals(expectedNumSplits, splits.size());
|
||||||
|
|
||||||
HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow,
|
HBaseTestingUtility.SeenRowTracker rowTracker =
|
||||||
stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
|
new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
|
||||||
|
|
||||||
boolean localityEnabled =
|
boolean localityEnabled =
|
||||||
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
|
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
|
||||||
|
@ -375,28 +317,12 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
// validate input split
|
// validate input split
|
||||||
InputSplit split = splits.get(i);
|
InputSplit split = splits.get(i);
|
||||||
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
|
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
|
||||||
TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
|
|
||||||
if (localityEnabled) {
|
if (localityEnabled) {
|
||||||
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
|
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
|
||||||
} else {
|
} else {
|
||||||
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
|
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
Scan scan =
|
|
||||||
TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
|
|
||||||
if (startRow.length > 0) {
|
|
||||||
Assert.assertTrue(
|
|
||||||
Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
|
|
||||||
Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
|
|
||||||
}
|
|
||||||
if (stopRow.length > 0) {
|
|
||||||
Assert.assertTrue(
|
|
||||||
Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
|
|
||||||
Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
|
|
||||||
}
|
|
||||||
Assert.assertTrue("startRow should < stopRow",
|
|
||||||
Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
|
|
||||||
|
|
||||||
// validate record reader
|
// validate record reader
|
||||||
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
|
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
|
||||||
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
|
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
|
||||||
|
|
Loading…
Reference in New Issue