HBASE-26421 Use HFileLink file to replace entire file's reference when splitting (#3842)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Xiaolin Ha 2021-11-18 21:36:36 +08:00 committed by GitHub
parent 1c47f80d83
commit b2571df7ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 273 additions and 47 deletions

View File

@ -79,8 +79,7 @@ public class HFileLink extends FileLink {
RegionInfoBuilder.ENCODED_REGION_NAME_REGEX, StoreFileInfo.HFILE_NAME_REGEX);
/** Define the HFile Link name parser in the form of: table=region-hfile */
//made package private for testing
static final Pattern LINK_NAME_PATTERN =
public static final Pattern LINK_NAME_PATTERN =
Pattern.compile(String.format("^(?:(%s)(?:\\=))?(%s)=(%s)-(%s)$",
TableName.VALID_NAMESPACE_REGEX, TableName.VALID_TABLE_QUALIFIER_REGEX,
RegionInfoBuilder.ENCODED_REGION_NAME_REGEX, StoreFileInfo.HFILE_NAME_REGEX));
@ -400,8 +399,33 @@ public class HFileLink extends FileLink {
String tableName = CommonFSUtils.getTableName(dstFamilyPath.getParent().getParent())
.getNameAsString();
return create(conf, fs, dstFamilyPath, familyName, tableName, regionName, linkedTable,
linkedRegion, hfileName, createBackRef);
}
/**
* Create a new HFileLink
*
* <p>It also adds a back-reference to the hfile back-reference directory
* to simplify the reference-count and the cleaning process.
* @param conf {@link Configuration} to read for the archive directory name
* @param fs {@link FileSystem} on which to write the HFileLink
* @param dstFamilyPath - Destination path (table/region/cf/)
* @param dstTableName - Destination table name
* @param dstRegionName - Destination region name
* @param linkedTable - Linked Table Name
* @param linkedRegion - Linked Region Name
* @param hfileName - Linked HFile name
* @param createBackRef - Whether back reference should be created. Defaults to true.
* @return true if the file is created, otherwise the file exists.
* @throws IOException on file or parent directory creation failure
*/
public static boolean create(final Configuration conf, final FileSystem fs,
final Path dstFamilyPath, final String familyName, final String dstTableName,
final String dstRegionName, final TableName linkedTable, final String linkedRegion,
final String hfileName, final boolean createBackRef) throws IOException {
String name = createHFileLinkName(linkedTable, linkedRegion, hfileName);
String refName = createBackReferenceName(tableName, regionName);
String refName = createBackReferenceName(dstTableName, dstRegionName);
// Make sure the destination directory exists
fs.mkdirs(dstFamilyPath);

View File

@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -142,6 +143,7 @@ public class SplitTableRegionProcedure
.setSplit(false)
.setRegionId(rid)
.build();
if (tableDescriptor.getRegionSplitPolicyClassName() != null) {
// Since we don't have region reference here, creating the split policy instance without it.
// This can be used to invoke methods which don't require Region reference. This instantiation
@ -624,16 +626,16 @@ public class SplitTableRegionProcedure
Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
assertReferenceFileCount(fs, expectedReferences.getFirst(),
assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
regionFs.getSplitsDir(daughterOneRI));
regionFs.commitDaughterRegion(daughterOneRI);
assertReferenceFileCount(fs, expectedReferences.getFirst(),
assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
new Path(tabledir, daughterOneRI.getEncodedName()));
assertReferenceFileCount(fs, expectedReferences.getSecond(),
assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
regionFs.getSplitsDir(daughterTwoRI));
regionFs.commitDaughterRegion(daughterTwoRI);
assertReferenceFileCount(fs, expectedReferences.getSecond(),
assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
new Path(tabledir, daughterTwoRI.getEncodedName()));
}
@ -773,11 +775,15 @@ public class SplitTableRegionProcedure
return new Pair<Integer, Integer>(daughterA, daughterB);
}
private void assertReferenceFileCount(final FileSystem fs, final int expectedReferenceFileCount,
final Path dir) throws IOException {
if (expectedReferenceFileCount != 0 &&
expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, dir)) {
throw new IOException("Failing split. Expected reference file count isn't equal.");
private void assertSplitResultFilesCount(final FileSystem fs,
final int expectedSplitResultFileCount, Path dir)
throws IOException {
if (expectedSplitResultFileCount != 0) {
int resultFileCount = FSUtils.getRegionReferenceAndLinkFileCount(fs, dir);
if (expectedSplitResultFileCount != resultFileCount) {
throw new IOException("Failing split. Didn't have expected reference and HFileLink files"
+ ", expected=" + expectedSplitResultFileCount + ", actual=" + resultFileCount);
}
}
}

View File

@ -1068,8 +1068,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
LOG.info("Opened {}; next sequenceid={}; {}, {}",
this.getRegionInfo().getShortNameToLog(), nextSeqId, this.splitPolicy, this.flushPolicy);
LOG.info("Opened {}; next sequenceid={}; {}, {}", this.getRegionInfo().getShortNameToLog(),
nextSeqId, this.splitPolicy, this.flushPolicy);
// A region can be reopened if failed a split; reset flags
this.closing.set(false);

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.io.HFileLink.LINK_NAME_PATTERN;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -27,6 +28,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.regex.Matcher;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -39,11 +41,13 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -53,7 +57,6 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@ -662,15 +665,17 @@ public class HRegionFileSystem {
LOG.warn("Found an already existing split file for {}. Assuming this is a recovery.", p);
return p;
}
boolean createLinkFile = false;
if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
// Check whether the split row lies in the range of the store file
// If it is outside the range, return directly.
f.initReader();
try {
Cell splitKey = PrivateCellUtil.createFirstOnRow(splitRow);
Optional<Cell> lastKey = f.getLastKey();
Optional<Cell> firstKey = f.getFirstKey();
if (top) {
//check if larger than last key.
Optional<Cell> lastKey = f.getLastKey();
// If lastKey is null means storefile is empty.
if (!lastKey.isPresent()) {
return null;
@ -678,9 +683,12 @@ public class HRegionFileSystem {
if (f.getComparator().compare(splitKey, lastKey.get()) > 0) {
return null;
}
if (firstKey.isPresent() && f.getComparator().compare(splitKey, firstKey.get()) <= 0) {
LOG.debug("Will create HFileLink file for {}, top=true", f.getPath());
createLinkFile = true;
}
} else {
//check if smaller than first key
Optional<Cell> firstKey = f.getFirstKey();
// If firstKey is null means storefile is empty.
if (!firstKey.isPresent()) {
return null;
@ -688,11 +696,44 @@ public class HRegionFileSystem {
if (f.getComparator().compare(splitKey, firstKey.get()) < 0) {
return null;
}
if (lastKey.isPresent() && f.getComparator().compare(splitKey, lastKey.get()) >= 0) {
LOG.debug("Will create HFileLink file for {}, top=false", f.getPath());
createLinkFile = true;
}
}
} finally {
f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
}
}
if (createLinkFile) {
// create HFileLink file instead of Reference file for child
String hfileName = f.getPath().getName();
TableName linkedTable = regionInfoForFs.getTable();
String linkedRegion = regionInfoForFs.getEncodedName();
try {
if (HFileLink.isHFileLink(hfileName)) {
Matcher m = LINK_NAME_PATTERN.matcher(hfileName);
if (!m.matches()) {
throw new IllegalArgumentException(hfileName + " is not a valid HFileLink name!");
}
linkedTable = TableName.valueOf(m.group(1), m.group(2));
linkedRegion = m.group(3);
hfileName = m.group(4);
}
// must create back reference here
HFileLink.create(conf, fs, splitDir, familyName, hri.getTable().getNameAsString(),
hri.getEncodedName(), linkedTable, linkedRegion, hfileName, true);
Path path =
new Path(splitDir, HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName));
LOG.info("Created linkFile:" + path.toString() + " for child: " + hri.getEncodedName()
+ ", parent: " + regionInfoForFs.getEncodedName());
return path;
} catch (IOException e) {
// if create HFileLink file failed, then just skip the error and create Reference file
LOG.error("Create link file for " + hfileName + " for child " + hri.getEncodedName()
+ "failed, will create Reference file", e);
}
}
// A reference to the bottom half of the hsf store file.
Reference r =
top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;

View File

@ -1056,28 +1056,65 @@ public final class FSUtils {
* @return List of paths to valid family directories in region dir.
* @throws IOException
*/
public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
throws IOException {
// assumes we are in a region dir.
FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
List<Path> familyDirs = new ArrayList<>(fds.length);
for (FileStatus fdfs: fds) {
Path fdPath = fdfs.getPath();
familyDirs.add(fdPath);
}
return familyDirs;
return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
}
public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
if (fds == null) {
return Collections.emptyList();
public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
throws IOException {
return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
}
List<Path> referenceFiles = new ArrayList<>(fds.size());
public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
throws IOException {
return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
}
private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
final PathFilter pathFilter) throws IOException {
FileStatus[] fds = fs.listStatus(dir, pathFilter);
List<Path> files = new ArrayList<>(fds.length);
for (FileStatus fdfs: fds) {
Path fdPath = fdfs.getPath();
referenceFiles.add(fdPath);
files.add(fdPath);
}
return files;
}
public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
int result = 0;
try {
for (Path familyDir : getFamilyDirs(fs, p)) {
result += getReferenceAndLinkFilePaths(fs, familyDir).size();
}
} catch (IOException e) {
LOG.warn("Error Counting reference files.", e);
}
return result;
}
public static class ReferenceAndLinkFileFilter implements PathFilter {
private final FileSystem fs;
public ReferenceAndLinkFileFilter(FileSystem fs) {
this.fs = fs;
}
@Override
public boolean accept(Path rd) {
try {
// only files can be references.
return !fs.getFileStatus(rd).isDirectory() && (StoreFileInfo.isReference(rd) ||
HFileLink.isHFileLink(rd));
} catch (IOException ioe) {
// Maybe the file was moved or the fs was disconnected.
LOG.warn("Skipping file " + rd +" due to IOException", ioe);
return false;
}
}
return referenceFiles;
}
/**

View File

@ -132,14 +132,17 @@ public class TestHStoreFile {
/**
* Write a file and then assert that we can read from top and bottom halves using two
* HalfMapFiles.
* HalfMapFiles, as well as one HalfMapFile and one HFileLink file.
*/
@Test
public void testBasicHalfMapFile() throws Exception {
public void testBasicHalfAndHFileLinkMapFile() throws Exception {
final RegionInfo hri =
RegionInfoBuilder.newBuilder(TableName.valueOf("testBasicHalfMapFileTb")).build();
RegionInfoBuilder.newBuilder(TableName.valueOf("testBasicHalfAndHFileLinkMapFile")).build();
// The locations of HFileLink refers hfiles only should be consistent with the table dir
// create by CommonFSUtils directory, so we should make the region directory under
// the mode of CommonFSUtils.getTableDir here.
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
new Path(testDir, hri.getTable().getNameAsString()), hri);
CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), hri.getTable()), hri);
HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
@ -395,6 +398,8 @@ public class TestHStoreFile {
f.initReader();
Cell midkey = f.getReader().midKey().get();
KeyValue midKV = (KeyValue) midkey;
// 1. test using the midRow as the splitKey, this test will generate two Reference files
// in the children
byte[] midRow = CellUtil.cloneRow(midKV);
// Create top split.
RegionInfo topHri =
@ -455,7 +460,7 @@ public class TestHStoreFile {
regionFs.cleanupDaughterRegion(topHri);
regionFs.cleanupDaughterRegion(bottomHri);
// Next test using a midkey that does not exist in the file.
// 2. test using a midkey which will generate one Reference file and one HFileLink file.
// First, do a key that is < than first key. Ensure splits behave
// properly.
byte[] badmidkey = Bytes.toBytes(" .");

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.client.TableDescriptorBuilder.SPLIT_POLICY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@ -41,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
@ -74,6 +77,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
@ -138,13 +142,13 @@ public class TestSplitTransactionOnCluster {
private SingleProcessHBaseCluster cluster = null;
private static final int NB_SERVERS = 3;
static final HBaseTestingUtil TESTING_UTIL =
new HBaseTestingUtil();
static final HBaseTestingUtil TESTING_UTIL = new HBaseTestingUtil();
@Rule
public TestName name = new TestName();
@BeforeClass public static void before() throws Exception {
@BeforeClass
public static void before() throws Exception {
TESTING_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_PERIOD, 60000);
StartTestingClusterOption option = StartTestingClusterOption.builder()
.masterClass(MyMaster.class).numRegionServers(NB_SERVERS).
@ -152,11 +156,13 @@ public class TestSplitTransactionOnCluster {
TESTING_UTIL.startMiniCluster(option);
}
@AfterClass public static void after() throws Exception {
@AfterClass
public static void after() throws Exception {
TESTING_UTIL.shutdownMiniCluster();
}
@Before public void setup() throws IOException {
@Before
public void setup() throws IOException {
TESTING_UTIL.ensureSomeNonStoppedRegionServersAvailable(NB_SERVERS);
this.admin = TESTING_UTIL.getAdmin();
this.cluster = TESTING_UTIL.getMiniHBaseCluster();
@ -359,6 +365,114 @@ public class TestSplitTransactionOnCluster {
admin.deleteTable(tableName);
}
@Test
public void testContinuousSplitUsingLinkFile() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create table then get the single region for our new table.
byte[] cf = Bytes.toBytes("cf");
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf));
String splitPolicy = ConstantSizeRegionSplitPolicy.class.getName();
builder.setValue(SPLIT_POLICY, splitPolicy);
admin.createTable(builder.build());
admin.compactionSwitch(false, new ArrayList<>());
assertNotEquals("Unable to retrieve regions of the table", -1,
TESTING_UTIL.waitFor(10000, () -> cluster.getRegions(tableName).size() == 1));
Table table = TESTING_UTIL.getConnection().getTable(tableName);
// insert data
insertData(tableName, admin, table, 10);
insertData(tableName, admin, table, 20);
insertData(tableName, admin, table, 40);
int rowCount = 3 * 4;
Scan scan = new Scan();
scanValidate(scan, rowCount, table);
// Split
admin.splitRegionAsync(cluster.getRegions(tableName).get(0).getRegionInfo().getRegionName(),
Bytes.toBytes("row14"));
// wait for the split to complete or get interrupted. If the split completes successfully,
// the procedure will return true; if the split fails, the procedure would throw exception.
Thread.sleep(3000);
assertNotEquals("Table is not split properly?", -1,
TESTING_UTIL.waitFor(3000, () -> cluster.getRegions(tableName).size() == 2));
// we have 2 daughter regions
HRegion hRegion1 = cluster.getRegions(tableName).get(0);
HRegion hRegion2 = cluster.getRegions(tableName).get(1);
HStore hStore1 = hRegion1.getStore(cf);
HStore hStore2 = hRegion2.getStore(cf);
// the sum of store files of the two children should be equal to their parent
assertEquals(3, hStore1.getStorefilesCount() + hStore2.getStorefilesCount());
// both the two children should have link files
for (StoreFile sf : hStore1.getStorefiles()) {
assertTrue(HFileLink.isHFileLink(sf.getPath()));
}
for (StoreFile sf : hStore2.getStorefiles()) {
assertTrue(HFileLink.isHFileLink(sf.getPath()));
}
// validate children data
scan = new Scan();
scanValidate(scan, rowCount, table);
//Continuous Split
findRegionToSplit(tableName, "row24");
Thread.sleep(3000);
assertNotEquals("Table is not split properly?", -1,
TESTING_UTIL.waitFor(3000, () -> cluster.getRegions(tableName).size() == 3));
// now table has 3 region, each region should have one link file
for (HRegion newRegion : cluster.getRegions(tableName)) {
assertEquals(1, newRegion.getStore(cf).getStorefilesCount());
assertTrue(
HFileLink.isHFileLink(newRegion.getStore(cf).getStorefiles().iterator().next().getPath()));
}
scan = new Scan();
scanValidate(scan, rowCount, table);
//Continuous Split, random split HFileLink, generate Reference files.
//After this, can not continuous split, because there are reference files.
findRegionToSplit(tableName, "row11");
Thread.sleep(3000);
assertNotEquals("Table is not split properly?", -1,
TESTING_UTIL.waitFor(3000, () -> cluster.getRegions(tableName).size() == 4));
scan = new Scan();
scanValidate(scan, rowCount, table);
}
private void findRegionToSplit(TableName tableName, String splitRowKey) throws Exception {
HRegion toSplit = null;
byte[] toSplitKey = Bytes.toBytes(splitRowKey);
for(HRegion rg : cluster.getRegions(tableName)) {
LOG.debug("startKey=" +
Bytes.toStringBinary(rg.getRegionInfo().getStartKey()) + ", getEndKey()=" +
Bytes.toStringBinary(rg.getRegionInfo().getEndKey()) + ", row=" + splitRowKey);
if((rg.getRegionInfo().getStartKey().length==0||
CellComparator.getInstance().compare(
PrivateCellUtil.createFirstOnRow(rg.getRegionInfo().getStartKey()),
PrivateCellUtil.createFirstOnRow(toSplitKey)) <= 0) &&(
rg.getRegionInfo().getEndKey().length==0||
CellComparator.getInstance().compare(
PrivateCellUtil.createFirstOnRow(rg.getRegionInfo().getEndKey()),
PrivateCellUtil.createFirstOnRow(toSplitKey)) >= 0)){
toSplit = rg;
}
}
assertNotNull(toSplit);
admin.splitRegionAsync(toSplit.getRegionInfo().getRegionName(), toSplitKey);
}
private static void scanValidate(Scan scan, int expectedRowCount, Table table) throws IOException{
ResultScanner scanner = table.getScanner(scan);
int rows = 0;
for (Result result : scanner) {
rows++;
}
scanner.close();
assertEquals(expectedRowCount, rows);
}
public static class FailingSplitMasterObserver implements MasterCoprocessor, MasterObserver {
volatile CountDownLatch latch;