HBASE-16011 TableSnapshotScanner and TableSnapshotInputFormat can produce duplicate rows if split table.
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
4ab94744e2
commit
f441ca0458
|
@ -126,9 +126,12 @@ public class TableSnapshotScanner extends AbstractClientScanner {
|
|||
|
||||
htd = meta.getTableDescriptor();
|
||||
regions = new ArrayList<>(restoredRegions.size());
|
||||
for (HRegionInfo hri: restoredRegions) {
|
||||
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
|
||||
hri.getStartKey(), hri.getEndKey())) {
|
||||
for (HRegionInfo hri : restoredRegions) {
|
||||
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
|
||||
continue;
|
||||
}
|
||||
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
|
||||
hri.getEndKey())) {
|
||||
regions.add(hri);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -275,7 +275,11 @@ public class TableSnapshotInputFormatImpl {
|
|||
List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
|
||||
|
||||
for (SnapshotRegionManifest regionManifest : regionManifests) {
|
||||
regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
|
||||
HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
|
||||
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
|
||||
continue;
|
||||
}
|
||||
regionInfos.add(hri);
|
||||
}
|
||||
return regionInfos;
|
||||
}
|
||||
|
|
|
@ -110,6 +110,56 @@ public class TestTableSnapshotScanner {
|
|||
table.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoDuplicateResultsWhenSplitting() throws Exception {
|
||||
setupCluster();
|
||||
TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
|
||||
String snapshotName = "testSnapshotBug";
|
||||
try {
|
||||
if (UTIL.getAdmin().tableExists(tableName)) {
|
||||
UTIL.deleteTable(tableName);
|
||||
}
|
||||
|
||||
UTIL.createTable(tableName, FAMILIES);
|
||||
Admin admin = UTIL.getAdmin();
|
||||
|
||||
// put some stuff in the table
|
||||
Table table = UTIL.getConnection().getTable(tableName);
|
||||
UTIL.loadTable(table, FAMILIES);
|
||||
|
||||
// split to 2 regions
|
||||
admin.split(tableName, Bytes.toBytes("eee"));
|
||||
TestTableSnapshotInputFormat.blockUntilSplitFinished(UTIL, tableName, 2);
|
||||
|
||||
Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
|
||||
FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
|
||||
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
|
||||
Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
|
||||
|
||||
// load different values
|
||||
byte[] value = Bytes.toBytes("after_snapshot_value");
|
||||
UTIL.loadTable(table, FAMILIES, value);
|
||||
|
||||
// cause flush to create new files in the region
|
||||
admin.flush(tableName);
|
||||
table.close();
|
||||
|
||||
Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
|
||||
|
||||
TableSnapshotScanner scanner =
|
||||
new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
|
||||
|
||||
verifyScanner(scanner, bbb, yyy);
|
||||
scanner.close();
|
||||
} finally {
|
||||
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
||||
UTIL.deleteTable(tableName);
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithSingleRegion() throws Exception {
|
||||
testScanner(UTIL, "testWithSingleRegion", 1, false);
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -55,6 +56,13 @@ import org.junit.rules.TestRule;
|
|||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.Arrays;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
@Category({VerySlowMapReduceTests.class, LargeTests.class})
|
||||
public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
|
||||
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().
|
||||
|
@ -222,6 +230,69 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
|||
}
|
||||
}
|
||||
|
||||
public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName,
|
||||
int expectedRegionSize) throws Exception {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName);
|
||||
if (hRegionInfoList.size() >= expectedRegionSize) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoDuplicateResultsWhenSplitting() throws Exception {
|
||||
setupCluster();
|
||||
TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
|
||||
String snapshotName = "testSnapshotBug";
|
||||
try {
|
||||
if (UTIL.getAdmin().tableExists(tableName)) {
|
||||
UTIL.deleteTable(tableName);
|
||||
}
|
||||
|
||||
UTIL.createTable(tableName, FAMILIES);
|
||||
Admin admin = UTIL.getAdmin();
|
||||
|
||||
// put some stuff in the table
|
||||
Table table = UTIL.getConnection().getTable(tableName);
|
||||
UTIL.loadTable(table, FAMILIES);
|
||||
|
||||
// split to 2 regions
|
||||
admin.split(tableName, Bytes.toBytes("eee"));
|
||||
blockUntilSplitFinished(UTIL, tableName, 2);
|
||||
|
||||
Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
|
||||
FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
|
||||
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
|
||||
null, snapshotName, rootDir, fs, true);
|
||||
|
||||
// load different values
|
||||
byte[] value = Bytes.toBytes("after_snapshot_value");
|
||||
UTIL.loadTable(table, FAMILIES, value);
|
||||
|
||||
// cause flush to create new files in the region
|
||||
admin.flush(tableName);
|
||||
table.close();
|
||||
|
||||
Job job = new Job(UTIL.getConfiguration());
|
||||
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
// limit the scan
|
||||
Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
|
||||
|
||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
|
||||
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
|
||||
tmpTableDir);
|
||||
|
||||
verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
|
||||
} finally {
|
||||
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
||||
UTIL.deleteTable(tableName);
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
|
||||
byte[] startRow, byte[] stopRow)
|
||||
throws IOException, InterruptedException {
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.mob.MobUtils;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
|
@ -251,7 +252,17 @@ public final class SnapshotTestingUtils {
|
|||
if (hasMob) {
|
||||
assertEquals(regions.size(), regionManifests.size() - 1);
|
||||
} else {
|
||||
assertEquals(regions.size(), regionManifests.size());
|
||||
// if create snapshot when table splitting, parent region will be included to the snapshot
|
||||
// region manifest. we should exclude the parent regions.
|
||||
int regionCountExclusiveSplitParent = 0;
|
||||
for (SnapshotRegionManifest snapshotRegionManifest : regionManifests.values()) {
|
||||
HRegionInfo hri = HRegionInfo.convert(snapshotRegionManifest.getRegionInfo());
|
||||
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
|
||||
continue;
|
||||
}
|
||||
regionCountExclusiveSplitParent++;
|
||||
}
|
||||
assertEquals(regions.size(), regionCountExclusiveSplitParent);
|
||||
}
|
||||
|
||||
// Verify Regions (redundant check, see MasterSnapshotVerifier)
|
||||
|
|
Loading…
Reference in New Issue