HBASE-26640 Reimplement master local region initialization to better work with SFT (#4111)
Signed-off-by: Josh Elser <elserj@apache.org> Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
1d3d35cbe9
commit
a3d1419bdd
|
@ -781,7 +781,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
// done with in a one row put, which means if we have data in catalog family then we can
|
// done with in a one row put, which means if we have data in catalog family then we can
|
||||||
// make sure that the migration is done.
|
// make sure that the migration is done.
|
||||||
LOG.info("The {} family in master local region already has data in it, skip migrating...",
|
LOG.info("The {} family in master local region already has data in it, skip migrating...",
|
||||||
HConstants.CATALOG_FAMILY);
|
HConstants.CATALOG_FAMILY_STR);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4081,7 +4081,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
|
|
||||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||||
allowedOnPath = ".*/src/test/.*")
|
allowedOnPath = ".*/src/test/.*")
|
||||||
MasterRegion getMasterRegion() {
|
public MasterRegion getMasterRegion() {
|
||||||
return masterRegion;
|
return masterRegion;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.region;
|
||||||
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
|
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseIOException;
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
@ -34,13 +36,18 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
|
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||||
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
|
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
|
||||||
|
@ -92,6 +99,10 @@ public final class MasterRegion {
|
||||||
|
|
||||||
private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
|
private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
|
||||||
|
|
||||||
|
static final String INITIALIZING_FLAG = ".initializing";
|
||||||
|
|
||||||
|
static final String INITIALIZED_FLAG = ".initialized";
|
||||||
|
|
||||||
private static final int REGION_ID = 1;
|
private static final int REGION_ID = 1;
|
||||||
|
|
||||||
private final WALFactory walFactory;
|
private final WALFactory walFactory;
|
||||||
|
@ -196,32 +207,39 @@ public final class MasterRegion {
|
||||||
|
|
||||||
private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
|
private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
|
||||||
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
|
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
|
||||||
MasterRegionWALRoller walRoller, String serverName) throws IOException {
|
MasterRegionWALRoller walRoller, String serverName, boolean touchInitializingFlag)
|
||||||
|
throws IOException {
|
||||||
TableName tn = td.getTableName();
|
TableName tn = td.getTableName();
|
||||||
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
|
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
|
||||||
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
|
|
||||||
TableName.valueOf(tn.getNamespaceAsString(), tn.getQualifierAsString() + "-tmp"));
|
|
||||||
if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
|
|
||||||
throw new IOException("Can not delete partial created proc region " + tmpTableDir);
|
|
||||||
}
|
|
||||||
HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, td).close();
|
|
||||||
Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
|
Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
|
||||||
if (!fs.rename(tmpTableDir, tableDir)) {
|
// persist table descriptor
|
||||||
throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
|
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, true);
|
||||||
|
HRegion.createHRegion(conf, regionInfo, fs, tableDir, td).close();
|
||||||
|
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
|
||||||
|
if (!fs.mkdirs(initializedFlag)) {
|
||||||
|
throw new IOException("Can not touch initialized flag: " + initializedFlag);
|
||||||
|
}
|
||||||
|
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
|
||||||
|
if (!fs.delete(initializingFlag, true)) {
|
||||||
|
LOG.warn("failed to clean up initializing flag: " + initializingFlag);
|
||||||
}
|
}
|
||||||
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
|
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
|
||||||
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
|
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HRegion open(Configuration conf, TableDescriptor td, FileSystem fs, Path rootDir,
|
private static RegionInfo loadRegionInfo(FileSystem fs, Path tableDir) throws IOException {
|
||||||
FileSystem walFs, Path walRootDir, WALFactory walFactory, MasterRegionWALRoller walRoller,
|
// on branch-2, the RegionInfo.isEncodedRegionName will returns true for .initializing and
|
||||||
String serverName) throws IOException {
|
// .initialized, see HBASE-25368. Since RegionInfo is IA.Public, changing the implementation may
|
||||||
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
|
// raise compatibility concerns, so here we just skip them by our own.
|
||||||
Path regionDir =
|
Path regionDir = fs.listStatus(tableDir, p -> !p.getName().startsWith(".")
|
||||||
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
|
&& RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0].getPath();
|
||||||
.getPath();
|
return HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
|
||||||
RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
|
}
|
||||||
|
|
||||||
|
private static HRegion open(Configuration conf, TableDescriptor td, RegionInfo regionInfo,
|
||||||
|
FileSystem fs, Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
|
||||||
|
MasterRegionWALRoller walRoller, String serverName) throws IOException {
|
||||||
|
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
|
||||||
Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo);
|
Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo);
|
||||||
Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR);
|
Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR);
|
||||||
if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) {
|
if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) {
|
||||||
|
@ -287,6 +305,39 @@ public final class MasterRegion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void tryMigrate(Configuration conf, FileSystem fs, Path tableDir,
|
||||||
|
RegionInfo regionInfo, TableDescriptor oldTd, TableDescriptor newTd) throws IOException {
|
||||||
|
Class<? extends StoreFileTracker> oldSft =
|
||||||
|
StoreFileTrackerFactory.getTrackerClass(oldTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
|
||||||
|
Class<? extends StoreFileTracker> newSft =
|
||||||
|
StoreFileTrackerFactory.getTrackerClass(newTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
|
||||||
|
if (oldSft.equals(newSft)) {
|
||||||
|
LOG.debug("old store file tracker {} is the same with new store file tracker, skip migration",
|
||||||
|
StoreFileTrackerFactory.getStoreFileTrackerName(oldSft));
|
||||||
|
if (!oldTd.equals(newTd)) {
|
||||||
|
// we may change other things such as adding a new family, so here we still need to persist
|
||||||
|
// the new table descriptor
|
||||||
|
LOG.info("Update table descriptor from {} to {}", oldTd, newTd);
|
||||||
|
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Migrate store file tracker from {} to {}", oldSft.getSimpleName(),
|
||||||
|
newSft.getSimpleName());
|
||||||
|
HRegionFileSystem hfs =
|
||||||
|
HRegionFileSystem.openRegionFromFileSystem(conf, fs, tableDir, regionInfo, false);
|
||||||
|
for (ColumnFamilyDescriptor oldCfd : oldTd.getColumnFamilies()) {
|
||||||
|
StoreFileTracker oldTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs);
|
||||||
|
StoreFileTracker newTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs);
|
||||||
|
List<StoreFileInfo> files = oldTracker.load();
|
||||||
|
LOG.debug("Store file list for {}: {}", oldCfd.getNameAsString(), files);
|
||||||
|
newTracker.set(oldTracker.load());
|
||||||
|
}
|
||||||
|
// persist the new table descriptor after migration
|
||||||
|
LOG.info("Update table descriptor from {} to {}", oldTd, newTd);
|
||||||
|
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true);
|
||||||
|
}
|
||||||
|
|
||||||
public static MasterRegion create(MasterRegionParams params) throws IOException {
|
public static MasterRegion create(MasterRegionParams params) throws IOException {
|
||||||
TableDescriptor td = params.tableDescriptor();
|
TableDescriptor td = params.tableDescriptor();
|
||||||
LOG.info("Create or load local region for table " + td);
|
LOG.info("Create or load local region for table " + td);
|
||||||
|
@ -321,16 +372,58 @@ public final class MasterRegion {
|
||||||
|
|
||||||
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString());
|
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString());
|
||||||
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
|
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
|
||||||
|
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
|
||||||
|
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
|
||||||
HRegion region;
|
HRegion region;
|
||||||
if (fs.exists(tableDir)) {
|
if (!fs.exists(tableDir)) {
|
||||||
// load the existing region.
|
// bootstrap, no doubt
|
||||||
region = open(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
|
if (!fs.mkdirs(initializedFlag)) {
|
||||||
|
throw new IOException("Can not touch initialized flag");
|
||||||
|
}
|
||||||
|
region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
|
||||||
|
server.getServerName().toString(), true);
|
||||||
|
} else {
|
||||||
|
if (!fs.exists(initializedFlag)) {
|
||||||
|
if (!fs.exists(initializingFlag)) {
|
||||||
|
// should be old style, where we do not have the initializing or initialized file, persist
|
||||||
|
// the table descriptor, touch the initialized flag and then open the region.
|
||||||
|
// the store file tracker must be DEFAULT
|
||||||
|
LOG.info("No {} or {} file, try upgrading", INITIALIZING_FLAG, INITIALIZED_FLAG);
|
||||||
|
TableDescriptor oldTd =
|
||||||
|
TableDescriptorBuilder.newBuilder(td).setValue(StoreFileTrackerFactory.TRACKER_IMPL,
|
||||||
|
StoreFileTrackerFactory.Trackers.DEFAULT.name()).build();
|
||||||
|
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, oldTd, true);
|
||||||
|
if (!fs.mkdirs(initializedFlag)) {
|
||||||
|
throw new IOException("Can not touch initialized flag: " + initializedFlag);
|
||||||
|
}
|
||||||
|
RegionInfo regionInfo = loadRegionInfo(fs, tableDir);
|
||||||
|
tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td);
|
||||||
|
region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
|
||||||
server.getServerName().toString());
|
server.getServerName().toString());
|
||||||
} else {
|
} else {
|
||||||
// bootstrapping...
|
// delete all contents besides the initializing flag, here we can make sure tableDir
|
||||||
|
// exists(unless someone delete it manually...), so we do not do null check here.
|
||||||
|
for (FileStatus status : fs.listStatus(tableDir)) {
|
||||||
|
if (!status.getPath().getName().equals(INITIALIZING_FLAG)) {
|
||||||
|
fs.delete(status.getPath(), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
|
region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
|
||||||
|
server.getServerName().toString(), false);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (fs.exists(initializingFlag) && !fs.delete(initializingFlag, true)) {
|
||||||
|
LOG.warn("failed to clean up initializing flag: " + initializingFlag);
|
||||||
|
}
|
||||||
|
// open it, make sure to load the table descriptor from fs
|
||||||
|
TableDescriptor oldTd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
|
||||||
|
RegionInfo regionInfo = loadRegionInfo(fs, tableDir);
|
||||||
|
tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td);
|
||||||
|
region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
|
||||||
server.getServerName().toString());
|
server.getServerName().toString());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
|
Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
|
||||||
MasterRegionFlusherAndCompactor flusherAndCompactor = new MasterRegionFlusherAndCompactor(conf,
|
MasterRegionFlusherAndCompactor flusherAndCompactor = new MasterRegionFlusherAndCompactor(conf,
|
||||||
server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
|
server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
|
||||||
|
|
|
@ -28,7 +28,10 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -78,6 +81,8 @@ public final class MasterRegionFactory {
|
||||||
|
|
||||||
private static final int DEFAULT_RING_BUFFER_SLOT_COUNT = 128;
|
private static final int DEFAULT_RING_BUFFER_SLOT_COUNT = 128;
|
||||||
|
|
||||||
|
public static final String TRACKER_IMPL = "hbase.master.store.region.file-tracker.impl";
|
||||||
|
|
||||||
public static final TableName TABLE_NAME = TableName.valueOf("master:store");
|
public static final TableName TABLE_NAME = TableName.valueOf("master:store");
|
||||||
|
|
||||||
public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");
|
public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");
|
||||||
|
@ -89,10 +94,23 @@ public final class MasterRegionFactory {
|
||||||
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
|
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
|
||||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
|
||||||
|
|
||||||
|
private static TableDescriptor withTrackerConfigs(Configuration conf) {
|
||||||
|
String trackerImpl = conf.get(TRACKER_IMPL, conf.get(StoreFileTrackerFactory.TRACKER_IMPL,
|
||||||
|
StoreFileTrackerFactory.Trackers.DEFAULT.name()));
|
||||||
|
Class<? extends StoreFileTracker> trackerClass =
|
||||||
|
StoreFileTrackerFactory.getTrackerClass(trackerImpl);
|
||||||
|
if (StoreFileTrackerFactory.isMigration(trackerClass)) {
|
||||||
|
throw new IllegalArgumentException("Should not set store file tracker to " +
|
||||||
|
StoreFileTrackerFactory.Trackers.MIGRATION.name() + " for master local region");
|
||||||
|
}
|
||||||
|
StoreFileTracker tracker = ReflectionUtils.newInstance(trackerClass, conf, true, null);
|
||||||
|
return tracker.updateWithTrackerConfigs(TableDescriptorBuilder.newBuilder(TABLE_DESC)).build();
|
||||||
|
}
|
||||||
|
|
||||||
public static MasterRegion create(Server server) throws IOException {
|
public static MasterRegion create(Server server) throws IOException {
|
||||||
MasterRegionParams params = new MasterRegionParams().server(server)
|
|
||||||
.regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
|
|
||||||
Configuration conf = server.getConfiguration();
|
Configuration conf = server.getConfiguration();
|
||||||
|
MasterRegionParams params = new MasterRegionParams().server(server)
|
||||||
|
.regionDirName(MASTER_STORE_DIR).tableDescriptor(withTrackerConfigs(conf));
|
||||||
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
|
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
|
||||||
long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
|
long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
|
||||||
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
|
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
|
||||||
|
|
|
@ -84,7 +84,7 @@ public final class StoreFileTrackerFactory {
|
||||||
return conf.get(TRACKER_IMPL, Trackers.DEFAULT.name());
|
return conf.get(TRACKER_IMPL, Trackers.DEFAULT.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
static String getStoreFileTrackerName(Class<? extends StoreFileTracker> clazz) {
|
public static String getStoreFileTrackerName(Class<? extends StoreFileTracker> clazz) {
|
||||||
Trackers name = CLASS_TO_ENUM.get(clazz);
|
Trackers name = CLASS_TO_ENUM.get(clazz);
|
||||||
return name != null ? name.name() : clazz.getName();
|
return name != null ? name.name() : clazz.getName();
|
||||||
}
|
}
|
||||||
|
@ -184,4 +184,8 @@ public final class StoreFileTrackerFactory {
|
||||||
}
|
}
|
||||||
return descriptor;
|
return descriptor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isMigration(Class<?> clazz) {
|
||||||
|
return MigrationStoreFileTracker.class.isAssignableFrom(clazz);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -511,6 +511,13 @@ public class FSTableDescriptors implements TableDescriptors {
|
||||||
return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty();
|
return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||||
|
allowedOnPath = ".*/src/test/.*")
|
||||||
|
public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException {
|
||||||
|
Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
|
||||||
|
deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes files matching the table info file pattern within the given directory whose sequenceId
|
* Deletes files matching the table info file pattern within the given directory whose sequenceId
|
||||||
* is at most the given max sequenceId.
|
* is at most the given max sequenceId.
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
||||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -85,19 +86,24 @@ public class MasterRegionTestBase {
|
||||||
/**
|
/**
|
||||||
* Creates a new MasterRegion using an existing {@code htu} on this class.
|
* Creates a new MasterRegion using an existing {@code htu} on this class.
|
||||||
*/
|
*/
|
||||||
protected void createMasterRegion() throws IOException {
|
protected final void createMasterRegion() throws IOException {
|
||||||
configure(htu.getConfiguration());
|
Configuration conf = htu.getConfiguration();
|
||||||
|
configure(conf);
|
||||||
choreService = new ChoreService(getClass().getSimpleName());
|
choreService = new ChoreService(getClass().getSimpleName());
|
||||||
cleanerPool = new DirScanPool(htu.getConfiguration());
|
cleanerPool = new DirScanPool(htu.getConfiguration());
|
||||||
Server server = mock(Server.class);
|
Server server = mock(Server.class);
|
||||||
when(server.getConfiguration()).thenReturn(htu.getConfiguration());
|
when(server.getConfiguration()).thenReturn(conf);
|
||||||
when(server.getServerName())
|
when(server.getServerName())
|
||||||
.thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
|
.thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
|
||||||
when(server.getChoreService()).thenReturn(choreService);
|
when(server.getChoreService()).thenReturn(choreService);
|
||||||
Path testDir = htu.getDataTestDir();
|
Path testDir = htu.getDataTestDir();
|
||||||
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
|
CommonFSUtils.setRootDir(conf, testDir);
|
||||||
MasterRegionParams params = new MasterRegionParams();
|
MasterRegionParams params = new MasterRegionParams();
|
||||||
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
|
TableDescriptor td = TableDescriptorBuilder
|
||||||
|
.newBuilder(TD).setValue(StoreFileTrackerFactory.TRACKER_IMPL, conf
|
||||||
|
.get(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name()))
|
||||||
|
.build();
|
||||||
|
params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(td)
|
||||||
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
|
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
|
||||||
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
|
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
|
||||||
.ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
|
.ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.region;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure we do not loss data after changing SFT implementation
|
||||||
|
*/
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestChangeSFTForMasterRegion {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestChangeSFTForMasterRegion.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName NAME = TableName.valueOf("test");
|
||||||
|
|
||||||
|
private static byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().set(MasterRegionFactory.TRACKER_IMPL,
|
||||||
|
StoreFileTrackerFactory.Trackers.DEFAULT.name());
|
||||||
|
// use zk connection registry, as we will shutdown the only master instance which will likely to
|
||||||
|
// lead to dead loop
|
||||||
|
UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
|
||||||
|
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
UTIL.createTable(NAME, FAMILY).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws IOException {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
// shutdown master
|
||||||
|
UTIL.getMiniHBaseCluster().stopMaster(0).join();
|
||||||
|
UTIL.getMiniHBaseCluster().getConf().set(MasterRegionFactory.TRACKER_IMPL,
|
||||||
|
StoreFileTrackerFactory.Trackers.FILE.name());
|
||||||
|
UTIL.getMiniHBaseCluster().startMaster();
|
||||||
|
// make sure that the table still exists
|
||||||
|
UTIL.waitTableAvailable(NAME);
|
||||||
|
// confirm that we have changed the SFT to FILE
|
||||||
|
TableDescriptor td =
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().getMasterRegion().region.getTableDescriptor();
|
||||||
|
assertEquals(StoreFileTrackerFactory.Trackers.FILE.name(),
|
||||||
|
td.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,117 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.region;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestMasterRegionInitialize extends MasterRegionTestBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestMasterRegionInitialize.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpgrade() throws IOException {
|
||||||
|
Path rootDir = new Path(htu.getDataTestDir(), REGION_DIR_NAME);
|
||||||
|
Path tableDir =
|
||||||
|
CommonFSUtils.getTableDir(rootDir, region.region.getTableDescriptor().getTableName());
|
||||||
|
Path initializingFlag = new Path(tableDir, MasterRegion.INITIALIZING_FLAG);
|
||||||
|
Path initializedFlag = new Path(tableDir, MasterRegion.INITIALIZED_FLAG);
|
||||||
|
HRegionFileSystem hfs = region.region.getRegionFileSystem();
|
||||||
|
assertFalse(hfs.getFileSystem().exists(initializingFlag));
|
||||||
|
assertTrue(hfs.getFileSystem().exists(initializedFlag));
|
||||||
|
byte[] row = Bytes.toBytes("row");
|
||||||
|
byte[] cf = CF1;
|
||||||
|
byte[] cq = Bytes.toBytes("qual");
|
||||||
|
byte[] value = Bytes.toBytes("value");
|
||||||
|
region.update(r -> r.put(new Put(row).addColumn(cf, cq, value)));
|
||||||
|
assertEquals(FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, region.flush(true).getResult());
|
||||||
|
// delete initialized flag to simulate old implementation
|
||||||
|
hfs.getFileSystem().delete(initializedFlag, true);
|
||||||
|
FSTableDescriptors.deleteTableDescriptors(hfs.getFileSystem(), tableDir);
|
||||||
|
assertNull(FSTableDescriptors.getTableDescriptorFromFs(hfs.getFileSystem(), tableDir));
|
||||||
|
// reopen, with new file tracker
|
||||||
|
region.close(false);
|
||||||
|
htu.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
|
||||||
|
StoreFileTrackerFactory.Trackers.FILE.name());
|
||||||
|
createMasterRegion();
|
||||||
|
|
||||||
|
// make sure we successfully upgrade to new implementation without data loss
|
||||||
|
hfs = region.region.getRegionFileSystem();
|
||||||
|
assertFalse(hfs.getFileSystem().exists(initializingFlag));
|
||||||
|
assertTrue(hfs.getFileSystem().exists(initializedFlag));
|
||||||
|
TableDescriptor td = FSTableDescriptors.getTableDescriptorFromFs(hfs.getFileSystem(), tableDir);
|
||||||
|
assertEquals(StoreFileTrackerFactory.Trackers.FILE.name(),
|
||||||
|
td.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
|
||||||
|
assertArrayEquals(value, region.get(new Get(row)).getValue(cf, cq));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitializingCleanup() throws IOException {
|
||||||
|
Path rootDir = new Path(htu.getDataTestDir(), REGION_DIR_NAME);
|
||||||
|
Path tableDir =
|
||||||
|
CommonFSUtils.getTableDir(rootDir, region.region.getTableDescriptor().getTableName());
|
||||||
|
Path initializingFlag = new Path(tableDir, MasterRegion.INITIALIZING_FLAG);
|
||||||
|
Path initializedFlag = new Path(tableDir, MasterRegion.INITIALIZED_FLAG);
|
||||||
|
HRegionFileSystem hfs = region.region.getRegionFileSystem();
|
||||||
|
assertFalse(hfs.getFileSystem().exists(initializingFlag));
|
||||||
|
assertTrue(hfs.getFileSystem().exists(initializedFlag));
|
||||||
|
byte[] row = Bytes.toBytes("row");
|
||||||
|
byte[] cf = CF1;
|
||||||
|
byte[] cq = Bytes.toBytes("qual");
|
||||||
|
byte[] value = Bytes.toBytes("value");
|
||||||
|
region.update(r -> r.put(new Put(row).addColumn(cf, cq, value)));
|
||||||
|
// delete initialized flag and touch a initializing flag, to simulate initializing in progress
|
||||||
|
hfs.getFileSystem().delete(initializedFlag, true);
|
||||||
|
if (!hfs.getFileSystem().mkdirs(initializingFlag)) {
|
||||||
|
throw new IOException("can not touch " + initializedFlag);
|
||||||
|
}
|
||||||
|
|
||||||
|
region.close(false);
|
||||||
|
createMasterRegion();
|
||||||
|
hfs = region.region.getRegionFileSystem();
|
||||||
|
assertFalse(hfs.getFileSystem().exists(initializingFlag));
|
||||||
|
assertTrue(hfs.getFileSystem().exists(initializedFlag));
|
||||||
|
|
||||||
|
// but the data should have been cleaned up
|
||||||
|
assertTrue(region.get(new Get(row)).isEmpty());
|
||||||
|
}
|
||||||
|
}
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -86,9 +87,11 @@ public class TestMasterRegionOnTwoFileSystems {
|
||||||
|
|
||||||
private static byte[] CQ = Bytes.toBytes("q");
|
private static byte[] CQ = Bytes.toBytes("q");
|
||||||
|
|
||||||
private static TableDescriptor TD =
|
private static TableDescriptor TD = TableDescriptorBuilder
|
||||||
TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
|
.newBuilder(TableName.valueOf("test:local"))
|
||||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
|
||||||
|
.setValue(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name())
|
||||||
|
.build();
|
||||||
|
|
||||||
private static int COMPACT_MIN = 4;
|
private static int COMPACT_MIN = 4;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue