From 716c685497ab5b81bb5f6955d9fe52461be93f42 Mon Sep 17 00:00:00 2001 From: bitterfox Date: Tue, 23 Mar 2021 00:57:11 +0900 Subject: [PATCH] HBASE-25672 Backport HBASE-25608 to branch-1 (#3068) Signed-off-by: stack --- .../hbase/mapreduce/HFileOutputFormat2.java | 72 ++++++- .../mapreduce/TestHFileOutputFormat2.java | 190 +++++++++++++++++- 2 files changed, 252 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 55ad814c81d..1896ec1e48d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -125,6 +125,13 @@ public class HFileOutputFormat2 public static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; + public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = + "hbase.hfileoutputformat.remote.cluster.zookeeper.quorum"; + public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY = + "hbase.hfileoutputformat.remote.cluster.zookeeper." + HConstants.CLIENT_PORT_STR; + public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = + "hbase.hfileoutputformat.remote.cluster." + HConstants.ZOOKEEPER_ZNODE_PARENT; + public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; @@ -223,7 +230,8 @@ public class HFileOutputFormat2 HRegionLocation loc = null; String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); if (tableName != null) { - try (Connection connection = ConnectionFactory.createConnection(conf); + try (Connection connection = ConnectionFactory.createConnection( + createRemoteClusterConf(conf)); RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { loc = locator.getRegionLocation(rowKey); @@ -286,6 +294,22 @@ public class HFileOutputFormat2 this.rollRequested = false; } + private Configuration createRemoteClusterConf(Configuration conf) { + final Configuration newConf = new Configuration(conf); + + final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY); + final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY); + final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY); + + if (quorum != null && clientPort != null && parent != null) { + newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum); + newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort)); + newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent); + } + + return newConf; + } + /* Create a new StoreFile.Writer. * @param family * @return A WriterLength, containing a new StoreFile.Writer. @@ -477,6 +501,7 @@ public class HFileOutputFormat2 *
  • Sets the output key/value class to match HFileOutputFormat2's requirements
  • *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or * PutSortReducer)
  • + *
  • Sets the HBase cluster key to load region locations for locality-sensitive
  • * * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. @@ -484,6 +509,7 @@ public class HFileOutputFormat2 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) throws IOException { configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + configureRemoteCluster(job, table.getConfiguration()); } /** @@ -574,6 +600,50 @@ public class HFileOutputFormat2 LOG.info("Incremental table " + table.getName() + " output configured."); } + /** + * Configure HBase cluster key for remote cluster to load region location for locality-sensitive + * if it's enabled. + * It's not necessary to call this method explicitly when the cluster key for HBase cluster to be + * used to load region location is configured in the job configuration. + * Call this method when another HBase cluster key is configured in the job configuration. + * For example, you should call when you load data from HBase cluster A using + * {@link TableInputFormat} and generate hfiles for HBase cluster B. + * Otherwise, HFileOutputFormat2 fetch location from cluster A and locality-sensitive won't + * working correctly. + * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using + * {@link Table#getConfiguration} as clusterConf. + * See HBASE-25608. + * + * @param job which has configuration to be updated + * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive + * + * @see #configureIncrementalLoad(Job, Table, RegionLocator) + * @see #LOCALITY_SENSITIVE_CONF_KEY + * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY + * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY + * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY + */ + public static void configureRemoteCluster(Job job, Configuration clusterConf) { + Configuration conf = job.getConfiguration(); + + if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + return; + } + + final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM); + final int clientPort = clusterConf.getInt( + HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + final String parent = clusterConf.get( + HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + + conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum); + conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); + conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); + + LOG.info("ZK configs for remote cluster of bulkload is configured: " + + quorum + ":" + clientPort + "/" + parent); + } + /** * Runs inside the task to deserialize column family to compression algorithm * map from the configuration. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index a0db8797aa9..e729ebcccf5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.apache.hadoop.hbase.client.ConnectionFactory.createConnection; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -35,7 +36,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -62,8 +67,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; @@ -78,17 +85,16 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; -import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; @@ -103,8 +109,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; -import com.google.common.collect.Lists; - /** * Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}. * Sets up and runs a mapreduce job that writes hfile output. @@ -1097,7 +1101,7 @@ public class TestHFileOutputFormat2 { generateRandomStartKeys(5); util.setJobWithoutMRCluster(); util.startMiniCluster(); - try (Connection conn = ConnectionFactory.createConnection(); + try (Connection conn = createConnection(); Admin admin = conn.getAdmin()) { final FileSystem fs = util.getDFSCluster().getFileSystem(); HTable table = util.createTable(TABLE_NAME, FAMILIES); @@ -1177,7 +1181,7 @@ public class TestHFileOutputFormat2 { generateRandomStartKeys(5); util.setJobWithoutMRCluster(); util.startMiniCluster(); - try (Connection conn = ConnectionFactory.createConnection(conf); + try (Connection conn = createConnection(conf); Admin admin = conn.getAdmin()){ Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); final FileSystem fs = util.getDFSCluster().getFileSystem(); @@ -1276,7 +1280,7 @@ public class TestHFileOutputFormat2 { } } else if ("incremental".equals(args[0])) { TableName tname = TableName.valueOf(args[1]); - try(Connection c = ConnectionFactory.createConnection(conf); + try(Connection c = createConnection(conf); Admin admin = c.getAdmin(); RegionLocator regionLocator = c.getRegionLocator(tname)) { Path outDir = new Path("incremental-out"); @@ -1330,5 +1334,173 @@ public class TestHFileOutputFormat2 { } -} + @Test + public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { + // Start cluster A + util = new HBaseTestingUtility(); + Configuration confA = util.getConfiguration(); + int hostCount = 3; + int regionNum = 20; + String[] hostnames = new String[hostCount]; + for (int i = 0; i < hostCount; ++i) { + hostnames[i] = "datanode_" + i; + } + util.setJobWithoutMRCluster(); + util.startMiniCluster(1, hostCount, hostnames); + // Start cluster B + HBaseTestingUtility utilB = new HBaseTestingUtility(); + Configuration confB = utilB.getConfiguration(); + utilB.startMiniCluster(1, hostCount, hostnames); + + Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); + + byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); + TableName tableName = TableName.valueOf("table"); + // Create table in cluster B + try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys); + RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { + // Generate the bulk load files + // Job has zookeeper configuration for cluster A + // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B + Job job = new Job(confA, "testLocalMRIncrementalLoad"); + Configuration jobConf = job.getConfiguration(); + final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); + job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); + setupRandomGeneratorMapper(job, false); + HFileOutputFormat2.configureIncrementalLoad(job, table, r); + + assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), + jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); + assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), + jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); + assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), + jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); + + FileOutputFormat.setOutputPath(job, testDir); + + assertFalse(util.getTestFileSystem().exists(testDir)); + + assertTrue(job.waitForCompletion(true)); + + final List configs = + ConfigurationCaptorConnection.getCapturedConfigarutions(key); + + assertFalse(configs.isEmpty()); + for (Configuration config : configs) { + assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), + config.get(HConstants.ZOOKEEPER_QUORUM)); + assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), + config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); + assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), + config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + } + } finally { + utilB.deleteTable(tableName); + testDir.getFileSystem(confA).delete(testDir, true); + util.shutdownMiniCluster(); + utilB.shutdownMiniCluster(); + } + } + + private static class ConfigurationCaptorConnection implements Connection { + private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; + + private static final Map> confs = new ConcurrentHashMap<>(); + + private final Connection delegate; + + public ConfigurationCaptorConnection( + Configuration conf, boolean managed, ExecutorService es, User user) + throws IOException { + Configuration confForDelegate = new Configuration(conf); + confForDelegate.unset(HConnection.HBASE_CLIENT_CONNECTION_IMPL); + delegate = createConnection(confForDelegate, es, user); + + final String uuid = conf.get(UUID_KEY); + if (uuid != null) { + final UUID key = UUID.fromString(uuid); + List configurations = confs.get(key); + if (configurations == null) { + configurations = new CopyOnWriteArrayList<>(); + confs.put(key, configurations); + } + configurations.add(conf); + } + } + + static UUID configureConnectionImpl(Configuration conf) { + conf.setClass(HConnection.HBASE_CLIENT_CONNECTION_IMPL, + ConfigurationCaptorConnection.class, Connection.class); + + final UUID uuid = UUID.randomUUID(); + conf.set(UUID_KEY, uuid.toString()); + return uuid; + } + + static List getCapturedConfigarutions(UUID key) { + return confs.get(key); + } + + @Override + public Configuration getConfiguration() { + return delegate.getConfiguration(); + } + + @Override + public Table getTable(TableName tableName) throws IOException { + return delegate.getTable(tableName); + } + + @Override + public Table getTable(TableName tableName, ExecutorService pool) throws IOException { + return delegate.getTable(tableName, pool); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return delegate.getBufferedMutator(tableName); + } + + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) + throws IOException { + return delegate.getBufferedMutator(params); + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + return delegate.getRegionLocator(tableName); + } + + @Override + public Admin getAdmin() throws IOException { + return delegate.getAdmin(); + } + + @Override + public String getClusterId() throws IOException { + return delegate.getClusterId(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public boolean isClosed() { + return delegate.isClosed(); + } + + @Override + public void abort(String why, Throwable e) { + delegate.abort(why, e); + } + + @Override + public boolean isAborted() { + return delegate.isAborted(); + } + } +}