diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index d3e0d560d0b..d90062f62c6 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -137,6 +137,10 @@ public class TableSnapshotInputFormat extends InputFormat 0 ? scan.getStartRow() : sp[i]); + } + + if (scan.getStopRow().length == 0) { + boundedScan.withStopRow(sp[i + 1]); + } else { + boundedScan.withStopRow( + Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); + } splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index ac9862deb9c..f61c222fb90 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -76,6 +76,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa private static final byte[] bbb = Bytes.toBytes("bbb"); private static final byte[] yyy = Bytes.toBytes("yyy"); + private static final byte[] bbc = Bytes.toBytes("bbc"); + private static final byte[] yya = Bytes.toBytes("yya"); @Rule public TestName name = new TestName(); @@ -246,6 +248,62 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa } } + @Test + public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception { + setupCluster(); + String snapshotName = "testWithMockedMapReduceMultiRegion"; + final TableName tableName = TableName.valueOf(name.getMethodName()); + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); + + Configuration conf = UTIL.getConfiguration(); + conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); + Job job = new Job(conf); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + // test scan with startRow and stopRow + Scan scan = new Scan(bbc, yya); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, + tmpTableDir, new RegionSplitter.UniformSplit(), 5); + + verifyWithMockedMapReduce(job, 10, 40, bbc, yya); + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + @Test + public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception { + setupCluster(); + String snapshotName = "testWithMockedMapReduceMultiRegion"; + final TableName tableName = TableName.valueOf(name.getMethodName()); + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); + + Configuration conf = UTIL.getConfiguration(); + conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); + Job job = new Job(conf); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + // test scan without startRow and stopRow + Scan scan2 = new Scan(); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, + tmpTableDir, new RegionSplitter.UniformSplit(), 5); + + verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_START_ROW); + + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + @Test public void testNoDuplicateResultsWhenSplitting() throws Exception { setupCluster(); @@ -306,8 +364,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa Assert.assertEquals(expectedNumSplits, splits.size()); - HBaseTestingUtility.SeenRowTracker rowTracker = - new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, + stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff")); boolean localityEnabled = job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, @@ -317,12 +375,28 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; if (localityEnabled) { Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); } else { Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); } + Scan scan = + TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan()); + if (startRow.length > 0) { + Assert.assertTrue( + Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()), + Bytes.compareTo(startRow, scan.getStartRow()) <= 0); + } + if (stopRow.length > 0) { + Assert.assertTrue( + Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()), + Bytes.compareTo(stopRow, scan.getStopRow()) >= 0); + } + Assert.assertTrue("startRow should < stopRow", + Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0); + // validate record reader TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());