HBASE-16011 TableSnapshotScanner and TableSnapshotInputFormat can produce duplicate rows if split table.

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
huzheng 2017-05-24 20:31:57 +08:00 committed by tedyu
parent d5838c78fc
commit d8c1e0e004
5 changed files with 141 additions and 3 deletions

View File

@ -128,7 +128,10 @@ public class TableSnapshotScanner extends AbstractClientScanner {
htd = meta.getTableDescriptor();
regions = new ArrayList<HRegionInfo>(restoredRegions.size());
for (HRegionInfo hri: restoredRegions) {
for (HRegionInfo hri : restoredRegions) {
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
continue;
}
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())) {
regions.add(hri);

View File

@ -274,7 +274,11 @@ public class TableSnapshotInputFormatImpl {
List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
for (SnapshotRegionManifest regionManifest : regionManifests) {
regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
continue;
}
regionInfos.add(hri);
}
return regionInfos;
}

View File

@ -109,6 +109,56 @@ public class TestTableSnapshotScanner {
table.close();
}
@Test
public void testNoDuplicateResultsWhenSplitting() throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
String snapshotName = "testSnapshotBug";
try {
if (UTIL.getHBaseAdmin().tableExists(tableName)) {
UTIL.deleteTable(tableName);
}
UTIL.createTable(tableName, FAMILIES);
Admin admin = UTIL.getHBaseAdmin();
// put some stuff in the table
Table table = UTIL.getConnection().getTable(tableName);
UTIL.loadTable(table, FAMILIES);
// split to 2 regions
admin.split(tableName, Bytes.toBytes("eee"));
TestTableSnapshotInputFormat.blockUntilSplitFinished(UTIL, tableName, 2);
Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
// load different values
byte[] value = Bytes.toBytes("after_snapshot_value");
UTIL.loadTable(table, FAMILIES, value);
// cause flush to create new files in the region
admin.flush(tableName);
table.close();
Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
TableSnapshotScanner scanner =
new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
verifyScanner(scanner, bbb, yyy);
scanner.close();
} finally {
UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
tearDownCluster();
}
}
@Test
public void testWithSingleRegion() throws Exception {
testScanner(UTIL, "testWithSingleRegion", 1, false);

View File

@ -22,14 +22,20 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
@ -37,6 +43,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@ -212,6 +219,69 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
}
}
public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName,
int expectedRegionSize) throws Exception {
for (int i = 0; i < 100; i++) {
List<HRegionInfo> hRegionInfoList = util.getHBaseAdmin().getTableRegions(tableName);
if (hRegionInfoList.size() >= expectedRegionSize) {
break;
}
Thread.sleep(1000);
}
}
@Test
public void testNoDuplicateResultsWhenSplitting() throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
String snapshotName = "testSnapshotBug";
try {
if (UTIL.getHBaseAdmin().tableExists(tableName)) {
UTIL.deleteTable(tableName);
}
UTIL.createTable(tableName, FAMILIES);
HBaseAdmin admin = UTIL.getHBaseAdmin();
// put some stuff in the table
Table table = UTIL.getConnection().getTable(tableName);
UTIL.loadTable(table, FAMILIES);
// split to 2 regions
admin.split(tableName, Bytes.toBytes("eee"));
blockUntilSplitFinished(UTIL, tableName, 2);
Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
null, snapshotName, rootDir, fs, true);
// load different values
byte[] value = Bytes.toBytes("after_snapshot_value");
UTIL.loadTable(table, FAMILIES, value);
// cause flush to create new files in the region
admin.flush(tableName);
table.close();
Job job = new Job(UTIL.getConfiguration());
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
// limit the scan
Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
tmpTableDir);
verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
} finally {
UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
tearDownCluster();
}
}
private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
byte[] startRow, byte[] stopRow)
throws IOException, InterruptedException {

View File

@ -245,7 +245,18 @@ public final class SnapshotTestingUtils {
List<HRegionInfo> regions = admin.getTableRegions(tableName);
// remove the non-default regions
RegionReplicaUtil.removeNonDefaultRegions(regions);
assertEquals(regions.size(), regionManifests.size());
// if create snapshot when table splitting, parent region will be included to the snapshot
// region manifest. we should exclude the parent regions.
int regionCountExclusiveSplitParent = 0;
for (SnapshotRegionManifest snapshotRegionManifest : regionManifests.values()) {
HRegionInfo hri = HRegionInfo.convert(snapshotRegionManifest.getRegionInfo());
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
continue;
}
regionCountExclusiveSplitParent++;
}
assertEquals(regions.size(), regionCountExclusiveSplitParent);
// Verify Regions (redundant check, see MasterSnapshotVerifier)
for (HRegionInfo info : regions) {