HBASE-21977 Skip replay WAL and update seqid when open regions restored from snapshot
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
a54f0bfb8f
commit
908dea8e41
|
@ -60,6 +60,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
|
|||
// open region from the snapshot directory
|
||||
region = HRegion.newHRegion(FSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, conf,
|
||||
hri, htd, null);
|
||||
region.setRestoredRegion(true);
|
||||
// we won't initialize the MobFileCache when not running in RS process. so provided an
|
||||
// initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
|
||||
// initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
|
||||
|
|
|
@ -340,6 +340,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
private Path regionDir;
|
||||
private FileSystem walFS;
|
||||
|
||||
// set to true if the region is restored from snapshot
|
||||
private boolean isRestoredRegion = false;
|
||||
|
||||
public void setRestoredRegion(boolean restoredRegion) {
|
||||
isRestoredRegion = restoredRegion;
|
||||
}
|
||||
|
||||
// The internal wait duration to acquire a lock before read/update
|
||||
// from the region. It is not per row. The purpose of this wait time
|
||||
// is to avoid waiting a long time while the region is busy, so that
|
||||
|
@ -953,7 +960,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
status.setStatus("Initializing all the Stores");
|
||||
long maxSeqId = initializeStores(reporter, status);
|
||||
this.mvcc.advanceTo(maxSeqId);
|
||||
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
|
||||
if (!isRestoredRegion && ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
|
||||
Collection<HStore> stores = this.stores.values();
|
||||
try {
|
||||
// update the stores that we are replaying
|
||||
|
@ -1006,15 +1013,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
// Use maximum of log sequenceid or that which was found in stores
|
||||
// (particularly if no recovered edits, seqid will be -1).
|
||||
long maxSeqIdFromFile =
|
||||
WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDirOfDefaultReplica());
|
||||
long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
|
||||
// The openSeqNum will always be increase even for read only region, as we rely on it to
|
||||
// determine whether a region has been successfully reopend, so here we always need to update
|
||||
// the max sequence id file.
|
||||
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
||||
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
|
||||
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
|
||||
long nextSeqId = maxSeqId + 1;
|
||||
if (!isRestoredRegion) {
|
||||
long maxSeqIdFromFile =
|
||||
WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDirOfDefaultReplica());
|
||||
nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
|
||||
// The openSeqNum will always be increase even for read only region, as we rely on it to
|
||||
// determine whether a region has been successfully reopend, so here we always need to update
|
||||
// the max sequence id file.
|
||||
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
||||
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
|
||||
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId);
|
||||
|
|
|
@ -31,12 +31,14 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -136,6 +139,45 @@ public class TestRestoreSnapshotHelper {
|
|||
checkNoHFileLinkInTableDir(tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSkipReplayAndUpdateSeqId() throws Exception {
|
||||
rootDir = TEST_UTIL.getDefaultRootDirPath();
|
||||
FSUtils.setRootDir(conf, rootDir);
|
||||
TableName tableName = TableName.valueOf("testSkipReplayAndUpdateSeqId");
|
||||
String snapshotName = "testSkipReplayAndUpdateSeqId";
|
||||
createTableAndSnapshot(tableName, snapshotName);
|
||||
// put some data in the table
|
||||
Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
TEST_UTIL.loadTable(table, Bytes.toBytes("A"));
|
||||
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
Path restoreDir = new Path("/hbase/.tmp-restore/testScannerWithRestoreScanner2");
|
||||
// restore snapshot.
|
||||
final RestoreSnapshotHelper.RestoreMetaChanges meta =
|
||||
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||
TableDescriptor htd = meta.getTableDescriptor();
|
||||
final List<RegionInfo> restoredRegions = meta.getRegionsToAdd();
|
||||
for (RegionInfo restoredRegion : restoredRegions) {
|
||||
// open restored region
|
||||
HRegion region = HRegion.newHRegion(FSUtils.getTableDir(restoreDir, tableName), null, fs,
|
||||
conf, restoredRegion, htd, null);
|
||||
// set restore flag
|
||||
region.setRestoredRegion(true);
|
||||
region.initialize();
|
||||
Path recoveredEdit =
|
||||
FSUtils.getWALRegionDir(conf, tableName, region.getRegionInfo().getEncodedName());
|
||||
long maxSeqId = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
|
||||
|
||||
// open restored region without set restored flag
|
||||
HRegion region2 = HRegion.newHRegion(FSUtils.getTableDir(restoreDir, tableName), null, fs,
|
||||
conf, restoredRegion, htd, null);
|
||||
region2.initialize();
|
||||
long maxSeqId2 = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
|
||||
Assert.assertTrue(maxSeqId2 > maxSeqId);
|
||||
}
|
||||
}
|
||||
|
||||
protected void createTableAndSnapshot(TableName tableName, String snapshotName)
|
||||
throws IOException {
|
||||
byte[] column = Bytes.toBytes("A");
|
||||
|
|
Loading…
Reference in New Issue