HBASE-21977 Skip replay WAL and update seqid when open regions restored from snapshot
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
b6afc5fbc1
commit
8fd1b0afe9
|
@ -60,6 +60,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
|
||||||
// open region from the snapshot directory
|
// open region from the snapshot directory
|
||||||
region = HRegion.newHRegion(FSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, conf,
|
region = HRegion.newHRegion(FSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, conf,
|
||||||
hri, htd, null);
|
hri, htd, null);
|
||||||
|
region.setRestoredRegion(true);
|
||||||
// we won't initialize the MobFileCache when not running in RS process. so provided an
|
// we won't initialize the MobFileCache when not running in RS process. so provided an
|
||||||
// initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
|
// initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
|
||||||
// initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
|
// initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
|
||||||
|
|
|
@ -338,6 +338,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
private Path regionDir;
|
private Path regionDir;
|
||||||
private FileSystem walFS;
|
private FileSystem walFS;
|
||||||
|
|
||||||
|
// set to true if the region is restored from snapshot
|
||||||
|
private boolean isRestoredRegion = false;
|
||||||
|
|
||||||
|
public void setRestoredRegion(boolean restoredRegion) {
|
||||||
|
isRestoredRegion = restoredRegion;
|
||||||
|
}
|
||||||
|
|
||||||
// The internal wait duration to acquire a lock before read/update
|
// The internal wait duration to acquire a lock before read/update
|
||||||
// from the region. It is not per row. The purpose of this wait time
|
// from the region. It is not per row. The purpose of this wait time
|
||||||
// is to avoid waiting a long time while the region is busy, so that
|
// is to avoid waiting a long time while the region is busy, so that
|
||||||
|
@ -951,7 +958,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
status.setStatus("Initializing all the Stores");
|
status.setStatus("Initializing all the Stores");
|
||||||
long maxSeqId = initializeStores(reporter, status);
|
long maxSeqId = initializeStores(reporter, status);
|
||||||
this.mvcc.advanceTo(maxSeqId);
|
this.mvcc.advanceTo(maxSeqId);
|
||||||
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
|
if (!isRestoredRegion && ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
|
||||||
Collection<HStore> stores = this.stores.values();
|
Collection<HStore> stores = this.stores.values();
|
||||||
try {
|
try {
|
||||||
// update the stores that we are replaying
|
// update the stores that we are replaying
|
||||||
|
@ -1000,15 +1007,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
// Use maximum of log sequenceid or that which was found in stores
|
// Use maximum of log sequenceid or that which was found in stores
|
||||||
// (particularly if no recovered edits, seqid will be -1).
|
// (particularly if no recovered edits, seqid will be -1).
|
||||||
long maxSeqIdFromFile =
|
long nextSeqId = maxSeqId + 1;
|
||||||
WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDirOfDefaultReplica());
|
if (!isRestoredRegion) {
|
||||||
long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
|
long maxSeqIdFromFile =
|
||||||
// The openSeqNum will always be increase even for read only region, as we rely on it to
|
WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDirOfDefaultReplica());
|
||||||
// determine whether a region has been successfully reopend, so here we always need to update
|
nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
|
||||||
// the max sequence id file.
|
// The openSeqNum will always be increase even for read only region, as we rely on it to
|
||||||
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
// determine whether a region has been successfully reopend, so here we always need to update
|
||||||
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
|
// the max sequence id file.
|
||||||
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
|
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
||||||
|
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
|
||||||
|
WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId);
|
LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId);
|
||||||
|
|
|
@ -31,12 +31,14 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||||
import org.apache.hadoop.hbase.io.HFileLink;
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
|
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -136,6 +139,45 @@ public class TestRestoreSnapshotHelper {
|
||||||
checkNoHFileLinkInTableDir(tableName);
|
checkNoHFileLinkInTableDir(tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSkipReplayAndUpdateSeqId() throws Exception {
|
||||||
|
rootDir = TEST_UTIL.getDefaultRootDirPath();
|
||||||
|
FSUtils.setRootDir(conf, rootDir);
|
||||||
|
TableName tableName = TableName.valueOf("testSkipReplayAndUpdateSeqId");
|
||||||
|
String snapshotName = "testSkipReplayAndUpdateSeqId";
|
||||||
|
createTableAndSnapshot(tableName, snapshotName);
|
||||||
|
// put some data in the table
|
||||||
|
Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||||
|
TEST_UTIL.loadTable(table, Bytes.toBytes("A"));
|
||||||
|
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
Path rootDir = FSUtils.getRootDir(conf);
|
||||||
|
Path restoreDir = new Path("/hbase/.tmp-restore/testScannerWithRestoreScanner2");
|
||||||
|
// restore snapshot.
|
||||||
|
final RestoreSnapshotHelper.RestoreMetaChanges meta =
|
||||||
|
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||||
|
TableDescriptor htd = meta.getTableDescriptor();
|
||||||
|
final List<RegionInfo> restoredRegions = meta.getRegionsToAdd();
|
||||||
|
for (RegionInfo restoredRegion : restoredRegions) {
|
||||||
|
// open restored region
|
||||||
|
HRegion region = HRegion.newHRegion(FSUtils.getTableDir(restoreDir, tableName), null, fs,
|
||||||
|
conf, restoredRegion, htd, null);
|
||||||
|
// set restore flag
|
||||||
|
region.setRestoredRegion(true);
|
||||||
|
region.initialize();
|
||||||
|
Path recoveredEdit =
|
||||||
|
FSUtils.getWALRegionDir(conf, tableName, region.getRegionInfo().getEncodedName());
|
||||||
|
long maxSeqId = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
|
||||||
|
|
||||||
|
// open restored region without set restored flag
|
||||||
|
HRegion region2 = HRegion.newHRegion(FSUtils.getTableDir(restoreDir, tableName), null, fs,
|
||||||
|
conf, restoredRegion, htd, null);
|
||||||
|
region2.initialize();
|
||||||
|
long maxSeqId2 = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
|
||||||
|
Assert.assertTrue(maxSeqId2 > maxSeqId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void createTableAndSnapshot(TableName tableName, String snapshotName)
|
protected void createTableAndSnapshot(TableName tableName, String snapshotName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte[] column = Bytes.toBytes("A");
|
byte[] column = Bytes.toBytes("A");
|
||||||
|
|
Loading…
Reference in New Issue