Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
4ccd215354
commit
30c20d7627
|
@ -583,34 +583,29 @@ public class MergeTableRegionsProcedure
|
|||
*/
|
||||
private void createMergedRegion(final MasterProcedureEnv env) throws IOException {
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
|
||||
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
|
||||
final FileSystem fs = mfs.getFileSystem();
|
||||
HRegionFileSystem mergeRegionFs = null;
|
||||
|
||||
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
|
||||
env.getMasterConfiguration(), fs, tableDir, mergedRegion);
|
||||
|
||||
for (RegionInfo ri: this.regionsToMerge) {
|
||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||
env.getMasterConfiguration(), fs, tabledir, ri, false);
|
||||
if (mergeRegionFs == null) {
|
||||
mergeRegionFs = regionFs;
|
||||
mergeRegionFs.createMergesDir();
|
||||
}
|
||||
mergeStoreFiles(env, regionFs, mergeRegionFs.getMergesDir());
|
||||
env.getMasterConfiguration(), fs, tableDir, ri, false);
|
||||
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
|
||||
}
|
||||
assert mergeRegionFs != null;
|
||||
mergeRegionFs.commitMergedRegion(mergedRegion);
|
||||
mergeRegionFs.commitMergedRegion();
|
||||
|
||||
// Prepare to create merged regions
|
||||
env.getAssignmentManager().getRegionStates().
|
||||
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create reference file(s) to parent region hfiles in the <code>mergeDir</code>
|
||||
* @param regionFs merge parent region file system
|
||||
* @param mergeDir the temp directory in which we are accumulating references.
|
||||
*/
|
||||
private void mergeStoreFiles(final MasterProcedureEnv env, final HRegionFileSystem regionFs,
|
||||
final Path mergeDir) throws IOException {
|
||||
final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
||||
private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
|
||||
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
|
||||
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
|
||||
.get(mergedRegion.getTable());
|
||||
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
|
||||
String family = hcd.getNameAsString();
|
||||
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
||||
|
@ -619,8 +614,8 @@ public class MergeTableRegionsProcedure
|
|||
// Create reference file(s) to parent region file here in mergedDir.
|
||||
// As this procedure is running on master, use CacheConfig.DISABLED means
|
||||
// don't cache any block.
|
||||
regionFs.mergeStoreFile(mergedRegion, family, new HStoreFile(
|
||||
storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED), mergeDir);
|
||||
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
|
||||
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -372,7 +372,7 @@ public class SplitTableRegionProcedure
|
|||
break;
|
||||
case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
|
||||
case SPLIT_TABLE_REGION_WRITE_MAX_SEQUENCE_ID_FILE:
|
||||
// Doing nothing, as re-open parent region would clean up daughter region directories.
|
||||
deleteDaughterRegions(env);
|
||||
break;
|
||||
case SPLIT_TABLE_REGIONS_CHECK_CLOSED_REGIONS:
|
||||
// Doing nothing, in SPLIT_TABLE_REGION_CLOSE_PARENT_REGION,
|
||||
|
@ -618,13 +618,13 @@ public class SplitTableRegionProcedure
|
|||
final FileSystem fs = mfs.getFileSystem();
|
||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
|
||||
|
||||
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
|
||||
|
||||
Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
|
||||
|
||||
assertReferenceFileCount(fs, expectedReferences.getFirst(),
|
||||
regionFs.getSplitsDir(daughterOneRI));
|
||||
//Move the files from the temporary .splits to the final /table/region directory
|
||||
regionFs.commitDaughterRegion(daughterOneRI);
|
||||
assertReferenceFileCount(fs, expectedReferences.getFirst(),
|
||||
new Path(tabledir, daughterOneRI.getEncodedName()));
|
||||
|
@ -636,6 +636,15 @@ public class SplitTableRegionProcedure
|
|||
new Path(tabledir, daughterTwoRI.getEncodedName()));
|
||||
}
|
||||
|
||||
private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOException {
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
final Path tabledir = CommonFSUtils.getTableDir(mfs.getRootDir(), getTableName());
|
||||
HRegionFileSystem.deleteRegionFromFileSystem(env.getMasterConfiguration(),
|
||||
mfs.getFileSystem(), tabledir, daughterOneRI);
|
||||
HRegionFileSystem.deleteRegionFromFileSystem(env.getMasterConfiguration(),
|
||||
mfs.getFileSystem(), tabledir, daughterTwoRI);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Split directory
|
||||
* @param env MasterProcedureEnv
|
||||
|
@ -648,9 +657,9 @@ public class SplitTableRegionProcedure
|
|||
// there's files to split. It then fires up everything, waits for
|
||||
// completion and finally checks for any exception
|
||||
//
|
||||
// Note: splitStoreFiles creates daughter region dirs under the parent splits dir
|
||||
// Nothing to unroll here if failure -- re-run createSplitsDir will
|
||||
// clean this up.
|
||||
// Note: From HBASE-26187, splitStoreFiles now creates daughter region dirs straight under the
|
||||
// table dir. In case of failure, the proc would go through this again, already existing
|
||||
// region dirs and split files would just be ignored, new split files should get created.
|
||||
int nbFiles = 0;
|
||||
final Map<String, Collection<StoreFileInfo>> files =
|
||||
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
|
||||
|
|
|
@ -181,9 +181,7 @@ public class CatalogJanitor extends ScheduledChore {
|
|||
for (Map.Entry<RegionInfo, Result> e : mergedRegions.entrySet()) {
|
||||
if (this.services.isInMaintenanceMode()) {
|
||||
// Stop cleaning if the master is in maintenance mode
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("In maintenence mode, not cleaning");
|
||||
}
|
||||
LOG.debug("In maintenence mode, not cleaning");
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -1050,15 +1050,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
fs.cleanupTempDir();
|
||||
}
|
||||
|
||||
if (this.writestate.writesEnabled) {
|
||||
status.setStatus("Cleaning up detritus from prior splits");
|
||||
// Get rid of any splits or merges that were lost in-progress. Clean out
|
||||
// these directories here on open. We may be opening a region that was
|
||||
// being split but we crashed in the middle of it all.
|
||||
fs.cleanupAnySplitDetritus();
|
||||
fs.cleanupMergesDir();
|
||||
}
|
||||
|
||||
// Initialize split policy
|
||||
this.splitPolicy = RegionSplitPolicy.create(this, conf);
|
||||
|
||||
|
|
|
@ -572,49 +572,9 @@ public class HRegionFileSystem {
|
|||
// ===========================================================================
|
||||
// Splits Helpers
|
||||
// ===========================================================================
|
||||
/** @return {@link Path} to the temp directory used during split operations */
|
||||
Path getSplitsDir() {
|
||||
return new Path(getRegionDir(), REGION_SPLITS_DIR);
|
||||
}
|
||||
|
||||
public Path getSplitsDir(final RegionInfo hri) {
|
||||
return new Path(getSplitsDir(), hri.getEncodedName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up any split detritus that may have been left around from previous split attempts.
|
||||
*/
|
||||
void cleanupSplitsDir() throws IOException {
|
||||
deleteDir(getSplitsDir());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up any split detritus that may have been left around from previous
|
||||
* split attempts.
|
||||
* Call this method on initial region deploy.
|
||||
* @throws IOException
|
||||
*/
|
||||
void cleanupAnySplitDetritus() throws IOException {
|
||||
Path splitdir = this.getSplitsDir();
|
||||
if (!fs.exists(splitdir)) return;
|
||||
// Look at the splitdir. It could have the encoded names of the daughter
|
||||
// regions we tried to make. See if the daughter regions actually got made
|
||||
// out under the tabledir. If here under splitdir still, then the split did
|
||||
// not complete. Try and do cleanup. This code WILL NOT catch the case
|
||||
// where we successfully created daughter a but regionserver crashed during
|
||||
// the creation of region b. In this case, there'll be an orphan daughter
|
||||
// dir in the filesystem. TOOD: Fix.
|
||||
FileStatus[] daughters = CommonFSUtils.listStatus(fs, splitdir, new FSUtils.DirFilter(fs));
|
||||
if (daughters != null) {
|
||||
for (FileStatus daughter: daughters) {
|
||||
Path daughterDir = new Path(getTableDir(), daughter.getPath().getName());
|
||||
if (fs.exists(daughterDir) && !deleteDir(daughterDir)) {
|
||||
throw new IOException("Failed delete of " + daughterDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
cleanupSplitsDir();
|
||||
LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
|
||||
return new Path(getTableDir(), hri.getEncodedName());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -638,47 +598,38 @@ public class HRegionFileSystem {
|
|||
*/
|
||||
public Path commitDaughterRegion(final RegionInfo regionInfo)
|
||||
throws IOException {
|
||||
Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
|
||||
Path daughterTmpDir = this.getSplitsDir(regionInfo);
|
||||
|
||||
if (fs.exists(daughterTmpDir)) {
|
||||
|
||||
Path regionDir = this.getSplitsDir(regionInfo);
|
||||
if (fs.exists(regionDir)) {
|
||||
// Write HRI to a file in case we need to recover hbase:meta
|
||||
Path regionInfoFile = new Path(daughterTmpDir, REGION_INFO_FILE);
|
||||
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
|
||||
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
||||
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
||||
|
||||
// Move the daughter temp dir to the table dir
|
||||
if (!rename(daughterTmpDir, regionDir)) {
|
||||
throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
|
||||
}
|
||||
}
|
||||
|
||||
return regionDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the region splits directory.
|
||||
* Creates region split daughter directories under the table dir. If the daughter regions already
|
||||
* exist, for example, in the case of a recovery from a previous failed split procedure, this
|
||||
* method deletes the given region dir recursively, then recreates it again.
|
||||
*/
|
||||
public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws IOException {
|
||||
Path splitdir = getSplitsDir();
|
||||
if (fs.exists(splitdir)) {
|
||||
LOG.info("The " + splitdir + " directory exists. Hence deleting it to recreate it");
|
||||
if (!deleteDir(splitdir)) {
|
||||
throw new IOException("Failed deletion of " + splitdir + " before creating them again.");
|
||||
}
|
||||
Path daughterADir = getSplitsDir(daughterA);
|
||||
if (fs.exists(daughterADir) && !deleteDir(daughterADir)) {
|
||||
throw new IOException("Failed deletion of " + daughterADir + " before creating them again.");
|
||||
|
||||
}
|
||||
// splitDir doesn't exists now. No need to do an exists() call for it.
|
||||
if (!createDir(splitdir)) {
|
||||
throw new IOException("Failed create of " + splitdir);
|
||||
if (!createDir(daughterADir)) {
|
||||
throw new IOException("Failed create of " + daughterADir);
|
||||
}
|
||||
Path daughterATmpDir = getSplitsDir(daughterA);
|
||||
if (!createDir(daughterATmpDir)) {
|
||||
throw new IOException("Failed create of " + daughterATmpDir);
|
||||
Path daughterBDir = getSplitsDir(daughterB);
|
||||
if (fs.exists(daughterBDir) && !deleteDir(daughterBDir)) {
|
||||
throw new IOException("Failed deletion of " + daughterBDir + " before creating them again.");
|
||||
|
||||
}
|
||||
Path daughterBTmpDir = getSplitsDir(daughterB);
|
||||
if (!createDir(daughterBTmpDir)) {
|
||||
throw new IOException("Failed create of " + daughterBTmpDir);
|
||||
if (!createDir(daughterBDir)) {
|
||||
throw new IOException("Failed create of " + daughterBDir);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -698,6 +649,19 @@ public class HRegionFileSystem {
|
|||
*/
|
||||
public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte[] splitRow,
|
||||
boolean top, RegionSplitPolicy splitPolicy) throws IOException {
|
||||
Path splitDir = new Path(getSplitsDir(hri), familyName);
|
||||
// Add the referred-to regions name as a dot separated suffix.
|
||||
// See REF_NAME_REGEX regex above. The referred-to regions name is
|
||||
// up in the path of the passed in <code>f</code> -- parentdir is family,
|
||||
// then the directory above is the region name.
|
||||
String parentRegionName = regionInfoForFs.getEncodedName();
|
||||
// Write reference with same file id only with the other region name as
|
||||
// suffix and into the new region location (under same family).
|
||||
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
|
||||
if(fs.exists(p)){
|
||||
LOG.warn("Found an already existing split file for {}. Assuming this is a recovery.", p);
|
||||
return p;
|
||||
}
|
||||
if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
|
||||
// Check whether the split row lies in the range of the store file
|
||||
// If it is outside the range, return directly.
|
||||
|
@ -730,40 +694,20 @@ public class HRegionFileSystem {
|
|||
f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
|
||||
}
|
||||
}
|
||||
|
||||
Path splitDir = new Path(getSplitsDir(hri), familyName);
|
||||
// A reference to the bottom half of the hsf store file.
|
||||
Reference r =
|
||||
top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
|
||||
// Add the referred-to regions name as a dot separated suffix.
|
||||
// See REF_NAME_REGEX regex above. The referred-to regions name is
|
||||
// up in the path of the passed in <code>f</code> -- parentdir is family,
|
||||
// then the directory above is the region name.
|
||||
String parentRegionName = regionInfoForFs.getEncodedName();
|
||||
// Write reference with same file id only with the other region name as
|
||||
// suffix and into the new region location (under same family).
|
||||
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
|
||||
return r.write(fs, p);
|
||||
}
|
||||
|
||||
// ===========================================================================
|
||||
// Merge Helpers
|
||||
// ===========================================================================
|
||||
/** @return {@link Path} to the temp directory used during merge operations */
|
||||
public Path getMergesDir() {
|
||||
return new Path(getRegionDir(), REGION_MERGES_DIR);
|
||||
}
|
||||
|
||||
Path getMergesDir(final RegionInfo hri) {
|
||||
return new Path(getMergesDir(), hri.getEncodedName());
|
||||
return new Path(getTableDir(), hri.getEncodedName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up any merge detritus that may have been left around from previous merge attempts.
|
||||
*/
|
||||
void cleanupMergesDir() throws IOException {
|
||||
deleteDir(getMergesDir());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove merged region
|
||||
|
@ -787,87 +731,47 @@ public class HRegionFileSystem {
|
|||
}
|
||||
|
||||
/**
|
||||
* Create the region merges directory, a temporary directory to accumulate
|
||||
* merges in.
|
||||
* @throws IOException If merges dir already exists or we fail to create it.
|
||||
* @see HRegionFileSystem#cleanupMergesDir()
|
||||
*/
|
||||
public void createMergesDir() throws IOException {
|
||||
Path mergesdir = getMergesDir();
|
||||
if (fs.exists(mergesdir)) {
|
||||
LOG.info("{} directory exists. Deleting it to recreate it anew", mergesdir);
|
||||
if (!fs.delete(mergesdir, true)) {
|
||||
throw new IOException("Failed deletion of " + mergesdir + " before recreate.");
|
||||
}
|
||||
}
|
||||
if (!mkdirs(fs, conf, mergesdir)) {
|
||||
throw new IOException("Failed create of " + mergesdir);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write out a merge reference under the given merges directory. Package local
|
||||
* so it doesnt leak out of regionserver.
|
||||
* @param mergedRegion {@link RegionInfo} of the merged region
|
||||
* Write out a merge reference under the given merges directory.
|
||||
* @param mergingRegion {@link RegionInfo} for one of the regions being merged.
|
||||
* @param familyName Column Family Name
|
||||
* @param f File to create reference.
|
||||
* @param mergedDir
|
||||
* @return Path to created reference.
|
||||
* @throws IOException
|
||||
* @throws IOException if the merge write fails.
|
||||
*/
|
||||
public Path mergeStoreFile(RegionInfo mergedRegion, String familyName, HStoreFile f,
|
||||
Path mergedDir) throws IOException {
|
||||
Path referenceDir = new Path(new Path(mergedDir,
|
||||
mergedRegion.getEncodedName()), familyName);
|
||||
public Path mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFile f)
|
||||
throws IOException {
|
||||
Path referenceDir = new Path(getMergesDir(regionInfoForFs), familyName);
|
||||
// A whole reference to the store file.
|
||||
Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
|
||||
Reference r = Reference.createTopReference(mergingRegion.getStartKey());
|
||||
// Add the referred-to regions name as a dot separated suffix.
|
||||
// See REF_NAME_REGEX regex above. The referred-to regions name is
|
||||
// up in the path of the passed in <code>f</code> -- parentdir is family,
|
||||
// then the directory above is the region name.
|
||||
String mergingRegionName = regionInfoForFs.getEncodedName();
|
||||
String mergingRegionName = mergingRegion.getEncodedName();
|
||||
// Write reference with same file id only with the other region name as
|
||||
// suffix and into the new region location (under same family).
|
||||
Path p = new Path(referenceDir, f.getPath().getName() + "."
|
||||
+ mergingRegionName);
|
||||
+ mergingRegionName);
|
||||
return r.write(fs, p);
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit a merged region, moving it from the merges temporary directory to
|
||||
* the proper location in the filesystem.
|
||||
* @param mergedRegionInfo merged region {@link RegionInfo}
|
||||
* Commit a merged region, making it ready for use.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void commitMergedRegion(final RegionInfo mergedRegionInfo) throws IOException {
|
||||
Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
|
||||
Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
|
||||
// Move the tmp dir to the expected location
|
||||
if (mergedRegionTmpDir != null && fs.exists(mergedRegionTmpDir)) {
|
||||
|
||||
public void commitMergedRegion() throws IOException {
|
||||
Path regionDir = getMergesDir(regionInfoForFs);
|
||||
if (regionDir != null && fs.exists(regionDir)) {
|
||||
// Write HRI to a file in case we need to recover hbase:meta
|
||||
Path regionInfoFile = new Path(mergedRegionTmpDir, REGION_INFO_FILE);
|
||||
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
|
||||
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
||||
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
||||
|
||||
if (!fs.rename(mergedRegionTmpDir, regionDir)) {
|
||||
throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
|
||||
+ regionDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===========================================================================
|
||||
// Create/Open/Delete Helpers
|
||||
// ===========================================================================
|
||||
/**
|
||||
* Log the current state of the region
|
||||
* @param LOG log to output information
|
||||
* @throws IOException if an unexpected exception occurs
|
||||
*/
|
||||
void logFileSystemState(final Logger LOG) throws IOException {
|
||||
CommonFSUtils.logFileSystemState(fs, this.getRegionDir(), LOG);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param hri
|
||||
|
@ -1058,8 +962,6 @@ public class HRegionFileSystem {
|
|||
if (!readOnly) {
|
||||
// Cleanup temporary directories
|
||||
regionFs.cleanupTempDir();
|
||||
regionFs.cleanupSplitsDir();
|
||||
regionFs.cleanupMergesDir();
|
||||
|
||||
// If it doesn't exists, Write HRI to a file, in case we need to recover hbase:meta
|
||||
// Only create HRI if we are the default replica
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
public class TestDirectStoreSplitsMerges {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestDirectStoreSplitsMerges.class);
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void after() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSplitStoreDir() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||
//first put some data in order to have a store file created
|
||||
putThreeRowsAndFlush(table);
|
||||
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||
HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
|
||||
RegionInfo daughterA =
|
||||
RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey()).
|
||||
setEndKey(Bytes.toBytes("002")).setSplit(false)
|
||||
.setRegionId(region.getRegionInfo().getRegionId() +
|
||||
EnvironmentEdgeManager.currentTime()).build();
|
||||
HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
Path result = regionFS
|
||||
.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
|
||||
Bytes.toBytes("002"), false, region.getSplitPolicy());
|
||||
//asserts the reference file naming is correct
|
||||
validateResultingFile(region.getRegionInfo().getEncodedName(), result);
|
||||
//Additionally check if split region dir was created directly under table dir, not on .tmp
|
||||
Path resultGreatGrandParent = result.getParent().getParent().getParent();
|
||||
assertEquals(regionFS.getTableDir().getName(), resultGreatGrandParent.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeStoreFile() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||
//splitting the table first
|
||||
TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
|
||||
waitForSplitProcComplete(1000, 10);
|
||||
//Add data and flush to create files in the two different regions
|
||||
putThreeRowsAndFlush(table);
|
||||
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||
HRegion first = regions.get(0);
|
||||
HRegion second = regions.get(1);
|
||||
HRegionFileSystem regionFS = first.getRegionFileSystem();
|
||||
|
||||
RegionInfo mergeResult =
|
||||
RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
|
||||
.setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
|
||||
.setRegionId(first.getRegionInfo().getRegionId() +
|
||||
EnvironmentEdgeManager.currentTime()).build();
|
||||
|
||||
HRegionFileSystem mergeRegionFs = HRegionFileSystem
|
||||
.createRegionOnFileSystem(TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(),
|
||||
regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult);
|
||||
|
||||
//merge file from first region
|
||||
HStoreFile file = (HStoreFile) first.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
mergeFileFromRegion(mergeRegionFs, first, file);
|
||||
//merge file from second region
|
||||
file = (HStoreFile) second.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
mergeFileFromRegion(mergeRegionFs, second, file);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitDaughterRegionNoFiles() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||
HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
|
||||
RegionInfo daughterA =
|
||||
RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().
|
||||
getStartKey()).setEndKey(Bytes.toBytes("002")).setSplit(false).
|
||||
setRegionId(region.getRegionInfo().getRegionId() +
|
||||
EnvironmentEdgeManager.currentTime()).build();
|
||||
Path splitDir = regionFS.getSplitsDir(daughterA);
|
||||
Path result = regionFS.commitDaughterRegion(daughterA);
|
||||
assertEquals(splitDir, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitDaughterRegionWithFiles() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||
//first put some data in order to have a store file created
|
||||
putThreeRowsAndFlush(table);
|
||||
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||
HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
|
||||
RegionInfo daughterA =
|
||||
RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey()).
|
||||
setEndKey(Bytes.toBytes("002")).setSplit(false).
|
||||
setRegionId(region.getRegionInfo().getRegionId() +
|
||||
EnvironmentEdgeManager.currentTime()).build();
|
||||
RegionInfo daughterB = RegionInfoBuilder.newBuilder(table).setStartKey(Bytes.toBytes("002"))
|
||||
.setEndKey(region.getRegionInfo().getEndKey()).setSplit(false)
|
||||
.setRegionId(region.getRegionInfo().getRegionId()).build();
|
||||
Path splitDirA = regionFS.getSplitsDir(daughterA);
|
||||
Path splitDirB = regionFS.getSplitsDir(daughterB);
|
||||
HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
regionFS
|
||||
.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
|
||||
Bytes.toBytes("002"), false, region.getSplitPolicy());
|
||||
regionFS
|
||||
.splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
|
||||
Bytes.toBytes("002"), true, region.getSplitPolicy());
|
||||
Path resultA = regionFS.commitDaughterRegion(daughterA);
|
||||
Path resultB = regionFS.commitDaughterRegion(daughterB);
|
||||
assertEquals(splitDirA, resultA);
|
||||
assertEquals(splitDirB, resultB);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitMergedRegion() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||
//splitting the table first
|
||||
TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
|
||||
waitForSplitProcComplete(1000, 10);
|
||||
//Add data and flush to create files in the two different regions
|
||||
putThreeRowsAndFlush(table);
|
||||
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||
HRegion first = regions.get(0);
|
||||
HRegion second = regions.get(1);
|
||||
HRegionFileSystem regionFS = first.getRegionFileSystem();
|
||||
|
||||
RegionInfo mergeResult =
|
||||
RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
|
||||
.setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
|
||||
.setRegionId(first.getRegionInfo().getRegionId() +
|
||||
EnvironmentEdgeManager.currentTime()).build();
|
||||
|
||||
HRegionFileSystem mergeRegionFs = HRegionFileSystem
|
||||
.createRegionOnFileSystem(TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(),
|
||||
regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult);
|
||||
|
||||
//merge file from first region
|
||||
HStoreFile file = (HStoreFile) first.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
mergeFileFromRegion(mergeRegionFs, first, file);
|
||||
//merge file from second region
|
||||
file = (HStoreFile) second.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
mergeFileFromRegion(mergeRegionFs, second, file);
|
||||
mergeRegionFs.commitMergedRegion();
|
||||
}
|
||||
|
||||
private void waitForSplitProcComplete(int attempts, int waitTime) throws Exception {
|
||||
Procedure
|
||||
splitProc = TEST_UTIL.getHBaseCluster().getMaster().getProcedures().stream().findFirst().
|
||||
filter( p -> p instanceof SplitTableRegionProcedure).get();
|
||||
int count = 0;
|
||||
while((splitProc.isWaiting()||splitProc.isRunnable())&&count<attempts){
|
||||
synchronized (splitProc) {
|
||||
splitProc.wait(waitTime);
|
||||
}
|
||||
count++;
|
||||
}
|
||||
assertTrue(splitProc.isSuccess());
|
||||
}
|
||||
|
||||
private void mergeFileFromRegion(HRegionFileSystem regionFS, HRegion regionToMerge,
|
||||
HStoreFile file) throws IOException {
|
||||
Path mergedFile = regionFS.mergeStoreFile(regionToMerge.getRegionInfo(),
|
||||
Bytes.toString(FAMILY_NAME), file);
|
||||
validateResultingFile(regionToMerge.getRegionInfo().getEncodedName(), mergedFile);
|
||||
}
|
||||
|
||||
private void validateResultingFile(String originalRegion, Path result){
|
||||
assertEquals(originalRegion, result.getName().split("\\.")[1]);
|
||||
//asserts we are under the cf directory
|
||||
Path resultParent = result.getParent();
|
||||
assertEquals(Bytes.toString(FAMILY_NAME), resultParent.getName());
|
||||
}
|
||||
|
||||
private void putThreeRowsAndFlush(TableName table) throws IOException {
|
||||
Table tbl = TEST_UTIL.getConnection().getTable(table);
|
||||
Put put = new Put(Bytes.toBytes("001"));
|
||||
byte[] qualifier = Bytes.toBytes("1");
|
||||
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(1));
|
||||
tbl.put(put);
|
||||
put = new Put(Bytes.toBytes("002"));
|
||||
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
|
||||
tbl.put(put);
|
||||
put = new Put(Bytes.toBytes("003"));
|
||||
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
|
||||
tbl.put(put);
|
||||
TEST_UTIL.flush(table);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue