From 5cea8112fde64c019e7c9ed8f9a4220834276eda Mon Sep 17 00:00:00 2001 From: "alan.zhao" <30570711+haohao0103@users.noreply.github.com> Date: Wed, 10 May 2023 21:47:20 +0800 Subject: [PATCH] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes (#5121) Co-authored-by: alanzhao Signed-off-by: Wellington Chevreuil --- .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 87 ++++++++++--- .../hadoop/hbase/tool/TestBulkLoadHFiles.java | 114 +++++++++++++++++- 2 files changed, 178 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index e54de3403e7..9b4e1aea906 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -22,6 +22,7 @@ import static java.lang.String.format; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; @@ -56,13 +57,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.AsyncAdmin; import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -114,6 +119,13 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class); + /** + * Keep locality while generating HFiles for bulkload. See HBASE-12596 + */ + public static final String LOCALITY_SENSITIVE_CONF_KEY = + "hbase.bulkload.locality.sensitive.enabled"; + private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; + public static final String NAME = "completebulkload"; /** * Whether to run validation on hfiles before loading. @@ -540,7 +552,6 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To Set, String>>> splittingFutures = new HashSet<>(); while (!queue.isEmpty()) { final LoadQueueItem item = queue.remove(); - final Callable, String>> call = () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); splittingFutures.add(pool.submit(call)); @@ -578,8 +589,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To return UUID.randomUUID().toString().replaceAll("-", ""); } - private List splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc, - byte[] splitKey) throws IOException { + private List splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item, + TableDescriptor tableDesc, byte[] splitKey) throws IOException { Path hfilePath = item.getFilePath(); byte[] family = item.getFamily(); Path tmpDir = hfilePath.getParent(); @@ -594,7 +605,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To Path botOut = new Path(tmpDir, uniqueName + ".bottom"); Path topOut = new Path(tmpDir, uniqueName + ".top"); - splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); + + splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); FileSystem fs = tmpDir.getFileSystem(getConf()); fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); @@ -718,8 +730,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To checkRegionIndexValid(splitIdx, startEndKeys, tableName); } byte[] splitPoint = startEndKeys.get(splitIdx).getSecond(); - List lqis = - splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint); + List lqis = splitStoreFile(conn.getRegionLocator(tableName), item, + FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint); + return new Pair<>(lqis, null); } @@ -729,25 +742,27 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To } /** - * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom - * filters, etc. + * Split a storefile into a top and bottom half with favored nodes, maintaining the metadata, + * recreating bloom filters, etc. */ @InterfaceAudience.Private - static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, - byte[] splitKey, Path bottomOut, Path topOut) throws IOException { + static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile, + ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut) + throws IOException { // Open reader with no block cache, and not in-memory Reference topReference = Reference.createTopReference(splitKey); Reference bottomReference = Reference.createBottomReference(splitKey); - copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); - copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); + copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc); + copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc); } /** - * Copy half of an HFile into a new HFile. + * Copy half of an HFile into a new HFile with favored nodes. */ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, - Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { + Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc) + throws IOException { FileSystem fs = inFile.getFileSystem(conf); CacheConfig cacheConf = CacheConfig.DISABLED; HalfStoreFileReader halfReader = null; @@ -769,12 +784,50 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize) .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); - halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) - .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); + HFileScanner scanner = halfReader.getScanner(false, false, false); scanner.seekTo(); do { - halfWriter.append(scanner.getCell()); + final Cell cell = scanner.getCell(); + if (null != halfWriter) { + halfWriter.append(cell); + } else { + + // init halfwriter + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + byte[] rowKey = CellUtil.cloneRow(cell); + HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey)); + InetSocketAddress[] favoredNodes = null; + if (null == hRegionLocation) { + LOG.warn( + "Failed get region location for rowkey {} , Using writer without favoured nodes.", + Bytes.toString(rowKey)); + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); + } else { + LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); + InetSocketAddress initialIsa = + new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort()); + if (initialIsa.isUnresolved()) { + LOG.warn("Failed get location for region {} , Using writer without favoured nodes.", + hRegionLocation); + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); + } else { + LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); + favoredNodes = new InetSocketAddress[] { initialIsa }; + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } + } + } else { + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); + } + halfWriter.append(cell); + } + } while (scanner.next()); for (Map.Entry entry : fileInfo.entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index fecf4c7ec2c..c6cbb6458c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -43,10 +44,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Table; @@ -63,7 +66,12 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.hamcrest.MatcherAssert; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -555,34 +563,70 @@ public class TestBulkLoadHFiles { FileSystem fs = util.getTestFileSystem(); Path testIn = new Path(dir, "testhfile"); ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); + String tableName = tn.getMethodName(); + util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); Path bottomOut = new Path(dir, "bottom.out"); Path topOut = new Path(dir, "top.out"); - BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc, - Bytes.toBytes("ggg"), bottomOut, topOut); + BulkLoadHFilesTool.splitStoreFile( + util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), + util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); int rowCount = verifyHFile(bottomOut); rowCount += verifyHFile(topOut); assertEquals(1000, rowCount); } + /** + * Test hfile splits with the favored nodes + */ + @Test + public void testSplitStoreFileWithFavoriteNodes() throws IOException { + + Path dir = new Path(util.getDefaultRootDirPath(), "testhfile"); + FileSystem fs = util.getDFSCluster().getFileSystem(); + + Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes"); + ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); + String tableName = tn.getMethodName(); + Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); + HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, + Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); + + Path bottomOut = new Path(dir, "bottom.out"); + Path topOut = new Path(dir, "top.out"); + + final AsyncTableRegionLocator regionLocator = + util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)); + BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc, + Bytes.toBytes("ggg"), bottomOut, topOut); + verifyHFileFavoriteNode(topOut, regionLocator, fs); + verifyHFileFavoriteNode(bottomOut, regionLocator, fs); + int rowCount = verifyHFile(bottomOut); + rowCount += verifyHFile(topOut); + assertEquals(1000, rowCount); + } + @Test public void testSplitStoreFileWithCreateTimeTS() throws IOException { Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS"); FileSystem fs = util.getTestFileSystem(); Path testIn = new Path(dir, "testhfile"); ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); + String tableName = tn.getMethodName(); + util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); Path bottomOut = new Path(dir, "bottom.out"); Path topOut = new Path(dir, "top.out"); - BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc, - Bytes.toBytes("ggg"), bottomOut, topOut); + BulkLoadHFilesTool.splitStoreFile( + util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), + util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); verifyHFileCreateTimeTS(bottomOut); verifyHFileCreateTimeTS(topOut); @@ -615,14 +659,17 @@ public class TestBulkLoadHFiles { Path testIn = new Path(dir, "testhfile"); ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); + String tableName = tn.getMethodName(); + util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); Path bottomOut = new Path(dir, "bottom.out"); Path topOut = new Path(dir, "top.out"); - BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc, - Bytes.toBytes("ggg"), bottomOut, topOut); + BulkLoadHFilesTool.splitStoreFile( + util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), + util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); int rowCount = verifyHFile(bottomOut); rowCount += verifyHFile(topOut); @@ -654,6 +701,61 @@ public class TestBulkLoadHFiles { } } + /** + * test split storefile with favorite node information + */ + private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs) + throws IOException { + Configuration conf = util.getConfiguration(); + + try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) { + + final byte[] firstRowkey = reader.getFirstRowKey().get(); + final HRegionLocation hRegionLocation = + FutureUtils.get(regionLocator.getRegionLocation(firstRowkey)); + + final String targetHostName = hRegionLocation.getHostname(); + + if (fs instanceof DistributedFileSystem) { + String pathStr = p.toUri().getPath(); + LocatedBlocks blocks = + ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L); + + boolean isFavoriteNode = false; + List locatedBlocks = blocks.getLocatedBlocks(); + int index = 0; + do { + if (index > 0) { + assertTrue("failed use favored nodes", isFavoriteNode); + } + isFavoriteNode = false; + final LocatedBlock block = locatedBlocks.get(index); + + final DatanodeInfo[] locations = block.getLocations(); + for (DatanodeInfo location : locations) { + + final String hostName = location.getHostName(); + if ( + targetHostName.equals(hostName.equals("127.0.0.1") + ? InetAddress.getLocalHost().getHostName() + : "127.0.0.1") || targetHostName.equals(hostName) + ) { + isFavoriteNode = true; + break; + } + } + + index++; + } while (index < locatedBlocks.size()); + if (index > 0) { + assertTrue("failed use favored nodes", isFavoriteNode); + } + + } + + } + } + private void addStartEndKeysForTest(TreeMap map, byte[] first, byte[] last) { Integer value = map.containsKey(first) ? map.get(first) : 0; map.put(first, value + 1);