From 976629c046a04c1076c8ec3e30445e271189ebe6 Mon Sep 17 00:00:00 2001 From: bitterfox Date: Wed, 17 Mar 2021 13:47:36 +0900 Subject: [PATCH] HBASE-25608 Support HFileOutputFormat locality sensitive even destination cluster is different from source cluster (#2988) Signed-off-by: stack --- .../hbase/mapreduce/HFileOutputFormat2.java | 72 ++++++- .../mapreduce/TestHFileOutputFormat2.java | 201 ++++++++++++++++++ 2 files changed, 272 insertions(+), 1 deletion(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index ee6d5331f3f..665bc32613c 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -163,6 +163,13 @@ public class HFileOutputFormat2 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; + public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = + "hbase.hfileoutputformat.remote.cluster.zookeeper.quorum"; + public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY = + "hbase.hfileoutputformat.remote.cluster.zookeeper." + HConstants.CLIENT_PORT_STR; + public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = + "hbase.hfileoutputformat.remote.cluster." + HConstants.ZOOKEEPER_ZNODE_PARENT; + public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; @@ -274,7 +281,8 @@ public class HFileOutputFormat2 HRegionLocation loc = null; String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { - try (Connection connection = ConnectionFactory.createConnection(conf); + try (Connection connection = ConnectionFactory.createConnection( + createRemoteClusterConf(conf)); RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { loc = locator.getRegionLocation(rowKey); @@ -340,6 +348,22 @@ public class HFileOutputFormat2 wl.written = 0; } + private Configuration createRemoteClusterConf(Configuration conf) { + final Configuration newConf = new Configuration(conf); + + final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY); + final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY); + final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY); + + if (quorum != null && clientPort != null && parent != null) { + newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum); + newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort)); + newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent); + } + + return newConf; + } + /* * Create a new StoreFile.Writer. * @return A WriterLength, containing a new StoreFile.Writer. @@ -518,6 +542,7 @@ public class HFileOutputFormat2 *
  • Sets the output key/value class to match HFileOutputFormat2's requirements
  • *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or * PutSortReducer)
  • + *
  • Sets the HBase cluster key to load region locations for locality-sensitive
  • * * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. @@ -525,6 +550,7 @@ public class HFileOutputFormat2 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) throws IOException { configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + configureRemoteCluster(job, table.getConfiguration()); } /** @@ -657,6 +683,50 @@ public class HFileOutputFormat2 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); } + /** + * Configure HBase cluster key for remote cluster to load region location for locality-sensitive + * if it's enabled. + * It's not necessary to call this method explicitly when the cluster key for HBase cluster to be + * used to load region location is configured in the job configuration. + * Call this method when another HBase cluster key is configured in the job configuration. + * For example, you should call when you load data from HBase cluster A using + * {@link TableInputFormat} and generate hfiles for HBase cluster B. + * Otherwise, HFileOutputFormat2 fetch location from cluster A and locality-sensitive won't + * working correctly. + * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using + * {@link Table#getConfiguration} as clusterConf. + * See HBASE-25608. + * + * @param job which has configuration to be updated + * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive + * + * @see #configureIncrementalLoad(Job, Table, RegionLocator) + * @see #LOCALITY_SENSITIVE_CONF_KEY + * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY + * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY + * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY + */ + public static void configureRemoteCluster(Job job, Configuration clusterConf) { + Configuration conf = job.getConfiguration(); + + if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + return; + } + + final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM); + final int clientPort = clusterConf.getInt( + HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); + final String parent = clusterConf.get( + HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + + conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum); + conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); + conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); + + LOG.info("ZK configs for remote cluster of bulkload is configured: " + + quorum + ":" + clientPort + "/" + parent); + } + /** * Runs inside the task to deserialize column family to compression algorithm * map from the configuration. diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 5fd9dfa55fb..04b53e01ea2 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.apache.hadoop.hbase.client.ConnectionFactory.createAsyncConnection; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -38,7 +39,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; @@ -60,21 +65,28 @@ import org.apache.hadoop.hbase.HadoopShims; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.client.Hbck; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableBuilder; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -90,12 +102,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -1635,5 +1649,192 @@ public class TestHFileOutputFormat2 { } + @Test + public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { + // Start cluster A + util = new HBaseTestingUtility(); + Configuration confA = util.getConfiguration(); + int hostCount = 3; + int regionNum = 20; + String[] hostnames = new String[hostCount]; + for (int i = 0; i < hostCount; ++i) { + hostnames[i] = "datanode_" + i; + } + StartMiniClusterOption option = StartMiniClusterOption.builder() + .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); + util.startMiniCluster(option); + + // Start cluster B + HBaseTestingUtility utilB = new HBaseTestingUtility(); + Configuration confB = utilB.getConfiguration(); + utilB.startMiniCluster(option); + + Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); + + byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); + TableName tableName = TableName.valueOf("table"); + // Create table in cluster B + try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys); + RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { + // Generate the bulk load files + // Job has zookeeper configuration for cluster A + // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B + Job job = new Job(confA, "testLocalMRIncrementalLoad"); + Configuration jobConf = job.getConfiguration(); + final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); + job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); + setupRandomGeneratorMapper(job, false); + HFileOutputFormat2.configureIncrementalLoad(job, table, r); + + assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), + jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); + assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), + jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); + assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), + jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); + + FileOutputFormat.setOutputPath(job, testDir); + + assertFalse(util.getTestFileSystem().exists(testDir)); + + assertTrue(job.waitForCompletion(true)); + + final List configs = + ConfigurationCaptorConnection.getCapturedConfigarutions(key); + + assertFalse(configs.isEmpty()); + for (Configuration config : configs) { + assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), + config.get(HConstants.ZOOKEEPER_QUORUM)); + assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), + config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); + assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), + config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + } + } finally { + utilB.deleteTable(tableName); + testDir.getFileSystem(confA).delete(testDir, true); + util.shutdownMiniCluster(); + utilB.shutdownMiniCluster(); + } + } + + private static class ConfigurationCaptorConnection implements Connection { + private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; + + private static final Map> confs = new ConcurrentHashMap<>(); + + private final Connection delegate; + + public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user) + throws IOException { + delegate = FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); + + final String uuid = conf.get(UUID_KEY); + if (uuid != null) { + confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); + } + } + + static UUID configureConnectionImpl(Configuration conf) { + conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, + ConfigurationCaptorConnection.class, Connection.class); + + final UUID uuid = UUID.randomUUID(); + conf.set(UUID_KEY, uuid.toString()); + return uuid; + } + + static List getCapturedConfigarutions(UUID key) { + return confs.get(key); + } + + @Override + public Configuration getConfiguration() { + return delegate.getConfiguration(); + } + + @Override + public Table getTable(TableName tableName) throws IOException { + return delegate.getTable(tableName); + } + + @Override + public Table getTable(TableName tableName, ExecutorService pool) throws IOException { + return delegate.getTable(tableName, pool); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return delegate.getBufferedMutator(tableName); + } + + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { + return delegate.getBufferedMutator(params); + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + return delegate.getRegionLocator(tableName); + } + + @Override + public void clearRegionLocationCache() { + delegate.clearRegionLocationCache(); + } + + @Override + public Admin getAdmin() throws IOException { + return delegate.getAdmin(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public boolean isClosed() { + return delegate.isClosed(); + } + + @Override + public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { + return delegate.getTableBuilder(tableName, pool); + } + + @Override + public AsyncConnection toAsyncConnection() { + return delegate.toAsyncConnection(); + } + + @Override + public String getClusterId() { + return delegate.getClusterId(); + } + + @Override + public Hbck getHbck() + throws IOException { + return delegate.getHbck(); + } + + @Override + public Hbck getHbck(ServerName masterServer) throws IOException { + return delegate.getHbck(masterServer); + } + + @Override + public void abort(String why, Throwable e) { + delegate.abort(why, e); + } + + @Override + public boolean isAborted() { + return delegate.isAborted(); + } + } + }