HBASE-26187 Write straight into the store directory when Splitting an… (#3574)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
cacf4a86a6
commit
721cb96f8c
|
@ -582,34 +582,29 @@ public class MergeTableRegionsProcedure
|
||||||
*/
|
*/
|
||||||
private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
|
private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
|
||||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||||
final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
|
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
|
||||||
final FileSystem fs = mfs.getFileSystem();
|
final FileSystem fs = mfs.getFileSystem();
|
||||||
HRegionFileSystem mergeRegionFs = null;
|
|
||||||
|
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
|
||||||
|
env.getMasterConfiguration(), fs, tableDir, mergedRegion);
|
||||||
|
|
||||||
for (RegionInfo ri: this.regionsToMerge) {
|
for (RegionInfo ri: this.regionsToMerge) {
|
||||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||||
env.getMasterConfiguration(), fs, tabledir, ri, false);
|
env.getMasterConfiguration(), fs, tableDir, ri, false);
|
||||||
if (mergeRegionFs == null) {
|
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
|
||||||
mergeRegionFs = regionFs;
|
|
||||||
mergeRegionFs.createMergesDir();
|
|
||||||
}
|
|
||||||
mergeStoreFiles(env, regionFs, mergeRegionFs.getMergesDir());
|
|
||||||
}
|
}
|
||||||
assert mergeRegionFs != null;
|
assert mergeRegionFs != null;
|
||||||
mergeRegionFs.commitMergedRegion(mergedRegion);
|
mergeRegionFs.commitMergedRegion();
|
||||||
|
|
||||||
// Prepare to create merged regions
|
// Prepare to create merged regions
|
||||||
env.getAssignmentManager().getRegionStates().
|
env.getAssignmentManager().getRegionStates().
|
||||||
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
|
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
|
||||||
* Create reference file(s) to parent region hfiles in the <code>mergeDir</code>
|
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
|
||||||
* @param regionFs merge parent region file system
|
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
|
||||||
* @param mergeDir the temp directory in which we are accumulating references.
|
.get(mergedRegion.getTable());
|
||||||
*/
|
|
||||||
private void mergeStoreFiles(final MasterProcedureEnv env, final HRegionFileSystem regionFs,
|
|
||||||
final Path mergeDir) throws IOException {
|
|
||||||
final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
|
||||||
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
|
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
|
||||||
String family = hcd.getNameAsString();
|
String family = hcd.getNameAsString();
|
||||||
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
||||||
|
@ -618,8 +613,8 @@ public class MergeTableRegionsProcedure
|
||||||
// Create reference file(s) to parent region file here in mergedDir.
|
// Create reference file(s) to parent region file here in mergedDir.
|
||||||
// As this procedure is running on master, use CacheConfig.DISABLED means
|
// As this procedure is running on master, use CacheConfig.DISABLED means
|
||||||
// don't cache any block.
|
// don't cache any block.
|
||||||
regionFs.mergeStoreFile(mergedRegion, family, new HStoreFile(
|
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
|
||||||
storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED), mergeDir);
|
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -372,7 +372,7 @@ public class SplitTableRegionProcedure
|
||||||
break;
|
break;
|
||||||
case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
|
case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
|
||||||
case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE:
|
case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE:
|
||||||
// Doing nothing, as re-open parent region would clean up daughter region directories.
|
deleteDaughterRegions(env);
|
||||||
break;
|
break;
|
||||||
case SPLIT_TABLE_REGIONS_CHECK_CLOSED_REGIONS:
|
case SPLIT_TABLE_REGIONS_CHECK_CLOSED_REGIONS:
|
||||||
// Doing nothing, in SPLIT_TABLE_REGION_CLOSE_PARENT_REGION,
|
// Doing nothing, in SPLIT_TABLE_REGION_CLOSE_PARENT_REGION,
|
||||||
|
@ -618,13 +618,13 @@ public class SplitTableRegionProcedure
|
||||||
final FileSystem fs = mfs.getFileSystem();
|
final FileSystem fs = mfs.getFileSystem();
|
||||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||||
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
|
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
|
||||||
|
|
||||||
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
|
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
|
||||||
|
|
||||||
Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
|
Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
|
||||||
|
|
||||||
assertReferenceFileCount(fs, expectedReferences.getFirst(),
|
assertReferenceFileCount(fs, expectedReferences.getFirst(),
|
||||||
regionFs.getSplitsDir(daughterOneRI));
|
regionFs.getSplitsDir(daughterOneRI));
|
||||||
//Move the files from the temporary .splits to the final /table/region directory
|
|
||||||
regionFs.commitDaughterRegion(daughterOneRI);
|
regionFs.commitDaughterRegion(daughterOneRI);
|
||||||
assertReferenceFileCount(fs, expectedReferences.getFirst(),
|
assertReferenceFileCount(fs, expectedReferences.getFirst(),
|
||||||
new Path(tabledir, daughterOneRI.getEncodedName()));
|
new Path(tabledir, daughterOneRI.getEncodedName()));
|
||||||
|
@ -636,6 +636,15 @@ public class SplitTableRegionProcedure
|
||||||
new Path(tabledir, daughterTwoRI.getEncodedName()));
|
new Path(tabledir, daughterTwoRI.getEncodedName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOException {
|
||||||
|
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||||
|
final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), getTableName());
|
||||||
|
HRegionFileSystem.deleteRegionFromFileSystem(env.getMasterConfiguration(),
|
||||||
|
mfs.getFileSystem(), tabledir, daughterOneRI);
|
||||||
|
HRegionFileSystem.deleteRegionFromFileSystem(env.getMasterConfiguration(),
|
||||||
|
mfs.getFileSystem(), tabledir, daughterTwoRI);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create Split directory
|
* Create Split directory
|
||||||
* @param env MasterProcedureEnv
|
* @param env MasterProcedureEnv
|
||||||
|
@ -648,9 +657,9 @@ public class SplitTableRegionProcedure
|
||||||
// there's files to split. It then fires up everything, waits for
|
// there's files to split. It then fires up everything, waits for
|
||||||
// completion and finally checks for any exception
|
// completion and finally checks for any exception
|
||||||
//
|
//
|
||||||
// Note: splitStoreFiles creates daughter region dirs under the parent splits dir
|
// Note: From HBASE-26187, splitStoreFiles now creates daughter region dirs straight under the
|
||||||
// Nothing to unroll here if failure -- re-run createSplitsDir will
|
// table dir. In case of failure, the proc would go through this again, already existing
|
||||||
// clean this up.
|
// region dirs and split files would just be ignored, new split files should get created.
|
||||||
int nbFiles = 0;
|
int nbFiles = 0;
|
||||||
final Map<String, Collection<StoreFileInfo>> files =
|
final Map<String, Collection<StoreFileInfo>> files =
|
||||||
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
|
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
|
||||||
|
|
|
@ -182,9 +182,7 @@ public class CatalogJanitor extends ScheduledChore {
|
||||||
for (Map.Entry<RegionInfo, Result> e : mergedRegions.entrySet()) {
|
for (Map.Entry<RegionInfo, Result> e : mergedRegions.entrySet()) {
|
||||||
if (this.services.isInMaintenanceMode()) {
|
if (this.services.isInMaintenanceMode()) {
|
||||||
// Stop cleaning if the master is in maintenance mode
|
// Stop cleaning if the master is in maintenance mode
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("In maintenence mode, not cleaning");
|
||||||
LOG.debug("In maintenence mode, not cleaning");
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1023,16 +1023,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
fs.cleanupTempDir();
|
fs.cleanupTempDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.writestate.writesEnabled) {
|
|
||||||
status.setStatus("Cleaning up detritus from prior splits");
|
|
||||||
// Get rid of any splits or merges that were lost in-progress. Clean out
|
|
||||||
// these directories here on open. We may be opening a region that was
|
|
||||||
// being split but we crashed in the middle of it all.
|
|
||||||
LOG.debug("Cleaning up detritus for " + this.getRegionInfo().getEncodedName());
|
|
||||||
fs.cleanupAnySplitDetritus();
|
|
||||||
fs.cleanupMergesDir();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize split policy
|
// Initialize split policy
|
||||||
this.splitPolicy = RegionSplitPolicy.create(this, conf);
|
this.splitPolicy = RegionSplitPolicy.create(this, conf);
|
||||||
|
|
||||||
|
|
|
@ -572,49 +572,9 @@ public class HRegionFileSystem {
|
||||||
// ===========================================================================
|
// ===========================================================================
|
||||||
// Splits Helpers
|
// Splits Helpers
|
||||||
// ===========================================================================
|
// ===========================================================================
|
||||||
/** @return {@link Path} to the temp directory used during split operations */
|
|
||||||
Path getSplitsDir() {
|
|
||||||
return new Path(getRegionDir(), REGION_SPLITS_DIR);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Path getSplitsDir(final RegionInfo hri) {
|
public Path getSplitsDir(final RegionInfo hri) {
|
||||||
return new Path(getSplitsDir(), hri.getEncodedName());
|
return new Path(getTableDir(), hri.getEncodedName());
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clean up any split detritus that may have been left around from previous split attempts.
|
|
||||||
*/
|
|
||||||
void cleanupSplitsDir() throws IOException {
|
|
||||||
deleteDir(getSplitsDir());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clean up any split detritus that may have been left around from previous
|
|
||||||
* split attempts.
|
|
||||||
* Call this method on initial region deploy.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void cleanupAnySplitDetritus() throws IOException {
|
|
||||||
Path splitdir = this.getSplitsDir();
|
|
||||||
if (!fs.exists(splitdir)) return;
|
|
||||||
// Look at the splitdir. It could have the encoded names of the daughter
|
|
||||||
// regions we tried to make. See if the daughter regions actually got made
|
|
||||||
// out under the tabledir. If here under splitdir still, then the split did
|
|
||||||
// not complete. Try and do cleanup. This code WILL NOT catch the case
|
|
||||||
// where we successfully created daughter a but regionserver crashed during
|
|
||||||
// the creation of region b. In this case, there'll be an orphan daughter
|
|
||||||
// dir in the filesystem. TOOD: Fix.
|
|
||||||
FileStatus[] daughters = CommonFSUtils.listStatus(fs, splitdir, new FSUtils.DirFilter(fs));
|
|
||||||
if (daughters != null) {
|
|
||||||
for (FileStatus daughter: daughters) {
|
|
||||||
Path daughterDir = new Path(getTableDir(), daughter.getPath().getName());
|
|
||||||
if (fs.exists(daughterDir) && !deleteDir(daughterDir)) {
|
|
||||||
throw new IOException("Failed delete of " + daughterDir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cleanupSplitsDir();
|
|
||||||
LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -638,47 +598,38 @@ public class HRegionFileSystem {
|
||||||
*/
|
*/
|
||||||
public Path commitDaughterRegion(final RegionInfo regionInfo)
|
public Path commitDaughterRegion(final RegionInfo regionInfo)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
|
Path regionDir = this.getSplitsDir(regionInfo);
|
||||||
Path daughterTmpDir = this.getSplitsDir(regionInfo);
|
if (fs.exists(regionDir)) {
|
||||||
|
|
||||||
if (fs.exists(daughterTmpDir)) {
|
|
||||||
|
|
||||||
// Write HRI to a file in case we need to recover hbase:meta
|
// Write HRI to a file in case we need to recover hbase:meta
|
||||||
Path regionInfoFile = new Path(daughterTmpDir, REGION_INFO_FILE);
|
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
|
||||||
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
||||||
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
||||||
|
|
||||||
// Move the daughter temp dir to the table dir
|
|
||||||
if (!rename(daughterTmpDir, regionDir)) {
|
|
||||||
throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return regionDir;
|
return regionDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the region splits directory.
|
* Creates region split daughter directories under the table dir. If the daughter regions already
|
||||||
|
* exist, for example, in the case of a recovery from a previous failed split procedure, this
|
||||||
|
* method deletes the given region dir recursively, then recreates it again.
|
||||||
*/
|
*/
|
||||||
public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws IOException {
|
public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws IOException {
|
||||||
Path splitdir = getSplitsDir();
|
Path daughterADir = getSplitsDir(daughterA);
|
||||||
if (fs.exists(splitdir)) {
|
if (fs.exists(daughterADir) && !deleteDir(daughterADir)) {
|
||||||
LOG.info("The " + splitdir + " directory exists. Hence deleting it to recreate it");
|
throw new IOException("Failed deletion of " + daughterADir + " before creating them again.");
|
||||||
if (!deleteDir(splitdir)) {
|
|
||||||
throw new IOException("Failed deletion of " + splitdir + " before creating them again.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// splitDir doesn't exists now. No need to do an exists() call for it.
|
if (!createDir(daughterADir)) {
|
||||||
if (!createDir(splitdir)) {
|
throw new IOException("Failed create of " + daughterADir);
|
||||||
throw new IOException("Failed create of " + splitdir);
|
|
||||||
}
|
}
|
||||||
Path daughterATmpDir = getSplitsDir(daughterA);
|
Path daughterBDir = getSplitsDir(daughterB);
|
||||||
if (!createDir(daughterATmpDir)) {
|
if (fs.exists(daughterBDir) && !deleteDir(daughterBDir)) {
|
||||||
throw new IOException("Failed create of " + daughterATmpDir);
|
throw new IOException("Failed deletion of " + daughterBDir + " before creating them again.");
|
||||||
|
|
||||||
}
|
}
|
||||||
Path daughterBTmpDir = getSplitsDir(daughterB);
|
if (!createDir(daughterBDir)) {
|
||||||
if (!createDir(daughterBTmpDir)) {
|
throw new IOException("Failed create of " + daughterBDir);
|
||||||
throw new IOException("Failed create of " + daughterBTmpDir);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -698,6 +649,19 @@ public class HRegionFileSystem {
|
||||||
*/
|
*/
|
||||||
public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte[] splitRow,
|
public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte[] splitRow,
|
||||||
boolean top, RegionSplitPolicy splitPolicy) throws IOException {
|
boolean top, RegionSplitPolicy splitPolicy) throws IOException {
|
||||||
|
Path splitDir = new Path(getSplitsDir(hri), familyName);
|
||||||
|
// Add the referred-to regions name as a dot separated suffix.
|
||||||
|
// See REF_NAME_REGEX regex above. The referred-to regions name is
|
||||||
|
// up in the path of the passed in <code>f</code> -- parentdir is family,
|
||||||
|
// then the directory above is the region name.
|
||||||
|
String parentRegionName = regionInfoForFs.getEncodedName();
|
||||||
|
// Write reference with same file id only with the other region name as
|
||||||
|
// suffix and into the new region location (under same family).
|
||||||
|
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
|
||||||
|
if(fs.exists(p)){
|
||||||
|
LOG.warn("Found an already existing split file for {}. Assuming this is a recovery.", p);
|
||||||
|
return p;
|
||||||
|
}
|
||||||
if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
|
if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
|
||||||
// Check whether the split row lies in the range of the store file
|
// Check whether the split row lies in the range of the store file
|
||||||
// If it is outside the range, return directly.
|
// If it is outside the range, return directly.
|
||||||
|
@ -730,40 +694,20 @@ public class HRegionFileSystem {
|
||||||
f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
|
f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Path splitDir = new Path(getSplitsDir(hri), familyName);
|
|
||||||
// A reference to the bottom half of the hsf store file.
|
// A reference to the bottom half of the hsf store file.
|
||||||
Reference r =
|
Reference r =
|
||||||
top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
|
top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
|
||||||
// Add the referred-to regions name as a dot separated suffix.
|
|
||||||
// See REF_NAME_REGEX regex above. The referred-to regions name is
|
|
||||||
// up in the path of the passed in <code>f</code> -- parentdir is family,
|
|
||||||
// then the directory above is the region name.
|
|
||||||
String parentRegionName = regionInfoForFs.getEncodedName();
|
|
||||||
// Write reference with same file id only with the other region name as
|
|
||||||
// suffix and into the new region location (under same family).
|
|
||||||
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
|
|
||||||
return r.write(fs, p);
|
return r.write(fs, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===========================================================================
|
// ===========================================================================
|
||||||
// Merge Helpers
|
// Merge Helpers
|
||||||
// ===========================================================================
|
// ===========================================================================
|
||||||
/** @return {@link Path} to the temp directory used during merge operations */
|
|
||||||
public Path getMergesDir() {
|
|
||||||
return new Path(getRegionDir(), REGION_MERGES_DIR);
|
|
||||||
}
|
|
||||||
|
|
||||||
Path getMergesDir(final RegionInfo hri) {
|
Path getMergesDir(final RegionInfo hri) {
|
||||||
return new Path(getMergesDir(), hri.getEncodedName());
|
return new Path(getTableDir(), hri.getEncodedName());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Clean up any merge detritus that may have been left around from previous merge attempts.
|
|
||||||
*/
|
|
||||||
void cleanupMergesDir() throws IOException {
|
|
||||||
deleteDir(getMergesDir());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove merged region
|
* Remove merged region
|
||||||
|
@ -787,87 +731,47 @@ public class HRegionFileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the region merges directory, a temporary directory to accumulate
|
* Write out a merge reference under the given merges directory.
|
||||||
* merges in.
|
* @param mergingRegion {@link RegionInfo} for one of the regions being merged.
|
||||||
* @throws IOException If merges dir already exists or we fail to create it.
|
|
||||||
* @see HRegionFileSystem#cleanupMergesDir()
|
|
||||||
*/
|
|
||||||
public void createMergesDir() throws IOException {
|
|
||||||
Path mergesdir = getMergesDir();
|
|
||||||
if (fs.exists(mergesdir)) {
|
|
||||||
LOG.info("{} directory exists. Deleting it to recreate it anew", mergesdir);
|
|
||||||
if (!fs.delete(mergesdir, true)) {
|
|
||||||
throw new IOException("Failed deletion of " + mergesdir + " before recreate.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!mkdirs(fs, conf, mergesdir)) {
|
|
||||||
throw new IOException("Failed create of " + mergesdir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write out a merge reference under the given merges directory. Package local
|
|
||||||
* so it doesnt leak out of regionserver.
|
|
||||||
* @param mergedRegion {@link RegionInfo} of the merged region
|
|
||||||
* @param familyName Column Family Name
|
* @param familyName Column Family Name
|
||||||
* @param f File to create reference.
|
* @param f File to create reference.
|
||||||
* @param mergedDir
|
|
||||||
* @return Path to created reference.
|
* @return Path to created reference.
|
||||||
* @throws IOException
|
* @throws IOException if the merge write fails.
|
||||||
*/
|
*/
|
||||||
public Path mergeStoreFile(RegionInfo mergedRegion, String familyName, HStoreFile f,
|
public Path mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFile f)
|
||||||
Path mergedDir) throws IOException {
|
throws IOException {
|
||||||
Path referenceDir = new Path(new Path(mergedDir,
|
Path referenceDir = new Path(getMergesDir(regionInfoForFs), familyName);
|
||||||
mergedRegion.getEncodedName()), familyName);
|
|
||||||
// A whole reference to the store file.
|
// A whole reference to the store file.
|
||||||
Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
|
Reference r = Reference.createTopReference(mergingRegion.getStartKey());
|
||||||
// Add the referred-to regions name as a dot separated suffix.
|
// Add the referred-to regions name as a dot separated suffix.
|
||||||
// See REF_NAME_REGEX regex above. The referred-to regions name is
|
// See REF_NAME_REGEX regex above. The referred-to regions name is
|
||||||
// up in the path of the passed in <code>f</code> -- parentdir is family,
|
// up in the path of the passed in <code>f</code> -- parentdir is family,
|
||||||
// then the directory above is the region name.
|
// then the directory above is the region name.
|
||||||
String mergingRegionName = regionInfoForFs.getEncodedName();
|
String mergingRegionName = mergingRegion.getEncodedName();
|
||||||
// Write reference with same file id only with the other region name as
|
// Write reference with same file id only with the other region name as
|
||||||
// suffix and into the new region location (under same family).
|
// suffix and into the new region location (under same family).
|
||||||
Path p = new Path(referenceDir, f.getPath().getName() + "."
|
Path p = new Path(referenceDir, f.getPath().getName() + "."
|
||||||
+ mergingRegionName);
|
+ mergingRegionName);
|
||||||
return r.write(fs, p);
|
return r.write(fs, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit a merged region, moving it from the merges temporary directory to
|
* Commit a merged region, making it ready for use.
|
||||||
* the proper location in the filesystem.
|
|
||||||
* @param mergedRegionInfo merged region {@link RegionInfo}
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void commitMergedRegion(final RegionInfo mergedRegionInfo) throws IOException {
|
public void commitMergedRegion() throws IOException {
|
||||||
Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
|
Path regionDir = getMergesDir(regionInfoForFs);
|
||||||
Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
|
if (regionDir != null && fs.exists(regionDir)) {
|
||||||
// Move the tmp dir to the expected location
|
|
||||||
if (mergedRegionTmpDir != null && fs.exists(mergedRegionTmpDir)) {
|
|
||||||
|
|
||||||
// Write HRI to a file in case we need to recover hbase:meta
|
// Write HRI to a file in case we need to recover hbase:meta
|
||||||
Path regionInfoFile = new Path(mergedRegionTmpDir, REGION_INFO_FILE);
|
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
|
||||||
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
||||||
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
||||||
|
|
||||||
if (!fs.rename(mergedRegionTmpDir, regionDir)) {
|
|
||||||
throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
|
|
||||||
+ regionDir);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===========================================================================
|
// ===========================================================================
|
||||||
// Create/Open/Delete Helpers
|
// Create/Open/Delete Helpers
|
||||||
// ===========================================================================
|
// ===========================================================================
|
||||||
/**
|
|
||||||
* Log the current state of the region
|
|
||||||
* @param LOG log to output information
|
|
||||||
* @throws IOException if an unexpected exception occurs
|
|
||||||
*/
|
|
||||||
void logFileSystemState(final Logger LOG) throws IOException {
|
|
||||||
CommonFSUtils.logFileSystemState(fs, this.getRegionDir(), LOG);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param hri
|
* @param hri
|
||||||
|
@ -1058,8 +962,6 @@ public class HRegionFileSystem {
|
||||||
if (!readOnly) {
|
if (!readOnly) {
|
||||||
// Cleanup temporary directories
|
// Cleanup temporary directories
|
||||||
regionFs.cleanupTempDir();
|
regionFs.cleanupTempDir();
|
||||||
regionFs.cleanupSplitsDir();
|
|
||||||
regionFs.cleanupMergesDir();
|
|
||||||
|
|
||||||
// If it doesn't exists, Write HRI to a file, in case we need to recover hbase:meta
|
// If it doesn't exists, Write HRI to a file, in case we need to recover hbase:meta
|
||||||
// Only create HRI if we are the default replica
|
// Only create HRI if we are the default replica
|
||||||
|
|
|
@ -0,0 +1,233 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
|
||||||
|
@Category({RegionServerTests.class, LargeTests.class})
|
||||||
|
public class TestDirectStoreSplitsMerges {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestDirectStoreSplitsMerges.class);
|
||||||
|
|
||||||
|
private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
||||||
|
|
||||||
|
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void after() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplitStoreDir() throws Exception {
|
||||||
|
TableName table = TableName.valueOf(name.getMethodName());
|
||||||
|
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||||
|
//first put some data in order to have a store file created
|
||||||
|
putThreeRowsAndFlush(table);
|
||||||
|
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||||
|
HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
|
||||||
|
RegionInfo daughterA =
|
||||||
|
RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey()).
|
||||||
|
setEndKey(Bytes.toBytes("002")).setSplit(false)
|
||||||
|
.setRegionId(region.getRegionInfo().getRegionId() +
|
||||||
|
EnvironmentEdgeManager.currentTime()).build();
|
||||||
|
HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||||
|
Path result = regionFS
|
||||||
|
.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
|
||||||
|
Bytes.toBytes("002"), false, region.getSplitPolicy());
|
||||||
|
//asserts the reference file naming is correct
|
||||||
|
validateResultingFile(region.getRegionInfo().getEncodedName(), result);
|
||||||
|
//Additionally check if split region dir was created directly under table dir, not on .tmp
|
||||||
|
Path resultGreatGrandParent = result.getParent().getParent().getParent();
|
||||||
|
assertEquals(regionFS.getTableDir().getName(), resultGreatGrandParent.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMergeStoreFile() throws Exception {
|
||||||
|
TableName table = TableName.valueOf(name.getMethodName());
|
||||||
|
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||||
|
//splitting the table first
|
||||||
|
TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
|
||||||
|
//Add data and flush to create files in the two different regions
|
||||||
|
putThreeRowsAndFlush(table);
|
||||||
|
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||||
|
HRegion first = regions.get(0);
|
||||||
|
HRegion second = regions.get(1);
|
||||||
|
HRegionFileSystem regionFS = first.getRegionFileSystem();
|
||||||
|
|
||||||
|
RegionInfo mergeResult =
|
||||||
|
RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
|
||||||
|
.setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
|
||||||
|
.setRegionId(first.getRegionInfo().getRegionId() +
|
||||||
|
EnvironmentEdgeManager.currentTime()).build();
|
||||||
|
|
||||||
|
HRegionFileSystem mergeRegionFs = HRegionFileSystem
|
||||||
|
.createRegionOnFileSystem(TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(),
|
||||||
|
regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult);
|
||||||
|
|
||||||
|
//merge file from first region
|
||||||
|
HStoreFile file = (HStoreFile) first.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||||
|
mergeFileFromRegion(mergeRegionFs, first, file);
|
||||||
|
//merge file from second region
|
||||||
|
file = (HStoreFile) second.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||||
|
mergeFileFromRegion(mergeRegionFs, second, file);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCommitDaughterRegionNoFiles() throws Exception {
|
||||||
|
TableName table = TableName.valueOf(name.getMethodName());
|
||||||
|
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||||
|
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||||
|
HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
|
||||||
|
RegionInfo daughterA =
|
||||||
|
RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().
|
||||||
|
getStartKey()).setEndKey(Bytes.toBytes("002")).setSplit(false).
|
||||||
|
setRegionId(region.getRegionInfo().getRegionId() +
|
||||||
|
EnvironmentEdgeManager.currentTime()).build();
|
||||||
|
Path splitDir = regionFS.getSplitsDir(daughterA);
|
||||||
|
Path result = regionFS.commitDaughterRegion(daughterA);
|
||||||
|
assertEquals(splitDir, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCommitDaughterRegionWithFiles() throws Exception {
|
||||||
|
TableName table = TableName.valueOf(name.getMethodName());
|
||||||
|
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||||
|
//first put some data in order to have a store file created
|
||||||
|
putThreeRowsAndFlush(table);
|
||||||
|
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||||
|
HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
|
||||||
|
RegionInfo daughterA =
|
||||||
|
RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey()).
|
||||||
|
setEndKey(Bytes.toBytes("002")).setSplit(false).
|
||||||
|
setRegionId(region.getRegionInfo().getRegionId() +
|
||||||
|
EnvironmentEdgeManager.currentTime()).build();
|
||||||
|
RegionInfo daughterB = RegionInfoBuilder.newBuilder(table).setStartKey(Bytes.toBytes("002"))
|
||||||
|
.setEndKey(region.getRegionInfo().getEndKey()).setSplit(false)
|
||||||
|
.setRegionId(region.getRegionInfo().getRegionId()).build();
|
||||||
|
Path splitDirA = regionFS.getSplitsDir(daughterA);
|
||||||
|
Path splitDirB = regionFS.getSplitsDir(daughterB);
|
||||||
|
HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||||
|
regionFS
|
||||||
|
.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
|
||||||
|
Bytes.toBytes("002"), false, region.getSplitPolicy());
|
||||||
|
regionFS
|
||||||
|
.splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
|
||||||
|
Bytes.toBytes("002"), true, region.getSplitPolicy());
|
||||||
|
Path resultA = regionFS.commitDaughterRegion(daughterA);
|
||||||
|
Path resultB = regionFS.commitDaughterRegion(daughterB);
|
||||||
|
assertEquals(splitDirA, resultA);
|
||||||
|
assertEquals(splitDirB, resultB);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCommitMergedRegion() throws Exception {
|
||||||
|
TableName table = TableName.valueOf(name.getMethodName());
|
||||||
|
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||||
|
//splitting the table first
|
||||||
|
TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
|
||||||
|
//Add data and flush to create files in the two different regions
|
||||||
|
putThreeRowsAndFlush(table);
|
||||||
|
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||||
|
HRegion first = regions.get(0);
|
||||||
|
HRegion second = regions.get(1);
|
||||||
|
HRegionFileSystem regionFS = first.getRegionFileSystem();
|
||||||
|
|
||||||
|
RegionInfo mergeResult =
|
||||||
|
RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
|
||||||
|
.setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
|
||||||
|
.setRegionId(first.getRegionInfo().getRegionId() +
|
||||||
|
EnvironmentEdgeManager.currentTime()).build();
|
||||||
|
|
||||||
|
HRegionFileSystem mergeRegionFs = HRegionFileSystem
|
||||||
|
.createRegionOnFileSystem(TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(),
|
||||||
|
regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult);
|
||||||
|
|
||||||
|
//merge file from first region
|
||||||
|
HStoreFile file = (HStoreFile) first.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||||
|
mergeFileFromRegion(mergeRegionFs, first, file);
|
||||||
|
//merge file from second region
|
||||||
|
file = (HStoreFile) second.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||||
|
mergeFileFromRegion(mergeRegionFs, second, file);
|
||||||
|
mergeRegionFs.commitMergedRegion();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeFileFromRegion(HRegionFileSystem regionFS, HRegion regionToMerge,
|
||||||
|
HStoreFile file) throws IOException {
|
||||||
|
Path mergedFile = regionFS.mergeStoreFile(regionToMerge.getRegionInfo(),
|
||||||
|
Bytes.toString(FAMILY_NAME), file);
|
||||||
|
validateResultingFile(regionToMerge.getRegionInfo().getEncodedName(), mergedFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateResultingFile(String originalRegion, Path result){
|
||||||
|
assertEquals(originalRegion, result.getName().split("\\.")[1]);
|
||||||
|
//asserts we are under the cf directory
|
||||||
|
Path resultParent = result.getParent();
|
||||||
|
assertEquals(Bytes.toString(FAMILY_NAME), resultParent.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void putThreeRowsAndFlush(TableName table) throws IOException {
|
||||||
|
Table tbl = TEST_UTIL.getConnection().getTable(table);
|
||||||
|
Put put = new Put(Bytes.toBytes("001"));
|
||||||
|
byte[] qualifier = Bytes.toBytes("1");
|
||||||
|
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(1));
|
||||||
|
tbl.put(put);
|
||||||
|
put = new Put(Bytes.toBytes("002"));
|
||||||
|
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
|
||||||
|
tbl.put(put);
|
||||||
|
put = new Put(Bytes.toBytes("003"));
|
||||||
|
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
|
||||||
|
tbl.put(put);
|
||||||
|
TEST_UTIL.flush(table);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue