diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index d9ba4bd6fb2..02b5768624a 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -176,6 +176,13 @@ public class HFileOutputFormat2
static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
"hbase.mapreduce.use.multi.table.hfileoutputformat";
+ public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
+ "hbase.hfileoutputformat.remote.cluster.zookeeper.quorum";
+ public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
+ "hbase.hfileoutputformat.remote.cluster.zookeeper." + HConstants.CLIENT_PORT_STR;
+ public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
+ "hbase.hfileoutputformat.remote.cluster." + HConstants.ZOOKEEPER_ZNODE_PARENT;
+
public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
@@ -288,7 +295,8 @@ public class HFileOutputFormat2
String tableName = Bytes.toString(tableNameBytes);
if (tableName != null) {
- try (Connection connection = ConnectionFactory.createConnection(conf);
+ try (Connection connection = ConnectionFactory.createConnection(
+ createRemoteClusterConf(conf));
RegionLocator locator =
connection.getRegionLocator(TableName.valueOf(tableName))) {
loc = locator.getRegionLocation(rowKey);
@@ -358,6 +366,22 @@ public class HFileOutputFormat2
wl.written = 0;
}
+ private Configuration createRemoteClusterConf(Configuration conf) {
+ final Configuration newConf = new Configuration(conf);
+
+ final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
+ final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
+ final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
+
+ if (quorum != null && clientPort != null && parent != null) {
+ newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
+ newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
+ newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
+ }
+
+ return newConf;
+ }
+
/*
* Create a new StoreFile.Writer.
* @return A WriterLength, containing a new StoreFile.Writer.
@@ -536,6 +560,7 @@ public class HFileOutputFormat2
*
Sets the output key/value class to match HFileOutputFormat2's requirements
* Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
* PutSortReducer)
+ * Sets the HBase cluster key to load region locations for locality-sensitive
*
* The user should be sure to set the map output value class to either KeyValue or Put before
* running this function.
@@ -543,6 +568,7 @@ public class HFileOutputFormat2
public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
throws IOException {
configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+ configureRemoteCluster(job, table.getConfiguration());
}
/**
@@ -672,6 +698,50 @@ public class HFileOutputFormat2
LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
}
+ /**
+ * Configure HBase cluster key for remote cluster to load region location for locality-sensitive
+ * if it's enabled.
+ * It's not necessary to call this method explicitly when the cluster key for HBase cluster to be
+ * used to load region location is configured in the job configuration.
+ * Call this method when another HBase cluster key is configured in the job configuration.
+ * For example, you should call when you load data from HBase cluster A using
+ * {@link TableInputFormat} and generate hfiles for HBase cluster B.
+ * Otherwise, HFileOutputFormat2 fetch location from cluster A and locality-sensitive won't
+ * working correctly.
+ * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
+ * {@link Table#getConfiguration} as clusterConf.
+ * See HBASE-25608.
+ *
+ * @param job which has configuration to be updated
+ * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
+ *
+ * @see #configureIncrementalLoad(Job, Table, RegionLocator)
+ * @see #LOCALITY_SENSITIVE_CONF_KEY
+ * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
+ * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
+ * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
+ */
+ public static void configureRemoteCluster(Job job, Configuration clusterConf) {
+ Configuration conf = job.getConfiguration();
+
+ if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+ return;
+ }
+
+ final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
+ final int clientPort = clusterConf.getInt(
+ HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
+ final String parent = clusterConf.get(
+ HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+
+ conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
+ conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
+ conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
+
+ LOG.info("ZK configs for remote cluster of bulkload is configured: " +
+ quorum + ":" + clientPort + "/" + parent);
+ }
+
/**
* Runs inside the task to deserialize column family to compression algorithm
* map from the configuration.
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 3d7ce192267..f94df2574ed 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import static org.apache.hadoop.hbase.client.ConnectionFactory.createConnection;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -35,7 +36,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
@@ -59,19 +64,25 @@ import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Hbck;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableBuilder;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -85,6 +96,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
@@ -1555,5 +1567,185 @@ public class TestHFileOutputFormat2 {
}
}
+
+ @Test
+ public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
+ // Start cluster A
+ util = new HBaseTestingUtility();
+ Configuration confA = util.getConfiguration();
+ int hostCount = 3;
+ int regionNum = 20;
+ String[] hostnames = new String[hostCount];
+ for (int i = 0; i < hostCount; ++i) {
+ hostnames[i] = "datanode_" + i;
+ }
+ StartMiniClusterOption option = StartMiniClusterOption.builder()
+ .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
+ util.startMiniCluster(option);
+
+ // Start cluster B
+ HBaseTestingUtility utilB = new HBaseTestingUtility();
+ Configuration confB = utilB.getConfiguration();
+ utilB.startMiniCluster(option);
+
+ Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+
+ byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
+ TableName tableName = TableName.valueOf("table");
+ // Create table in cluster B
+ try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
+ RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
+ // Generate the bulk load files
+ // Job has zookeeper configuration for cluster A
+ // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
+ Job job = new Job(confA, "testLocalMRIncrementalLoad");
+ Configuration jobConf = job.getConfiguration();
+ final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
+ job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
+ setupRandomGeneratorMapper(job, false);
+ HFileOutputFormat2.configureIncrementalLoad(job, table, r);
+
+ assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
+ jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
+ assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
+ jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
+ assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
+ jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
+
+ FileOutputFormat.setOutputPath(job, testDir);
+
+ assertFalse(util.getTestFileSystem().exists(testDir));
+
+ assertTrue(job.waitForCompletion(true));
+
+ final List configs =
+ ConfigurationCaptorConnection.getCapturedConfigarutions(key);
+
+ assertFalse(configs.isEmpty());
+ for (Configuration config : configs) {
+ assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
+ config.get(HConstants.ZOOKEEPER_QUORUM));
+ assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
+ config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
+ assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
+ config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ }
+ } finally {
+ utilB.deleteTable(tableName);
+ testDir.getFileSystem(confA).delete(testDir, true);
+ util.shutdownMiniCluster();
+ utilB.shutdownMiniCluster();
+ }
+ }
+
+ private static class ConfigurationCaptorConnection implements Connection {
+ private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
+
+ private static final Map> confs = new ConcurrentHashMap<>();
+
+ private final Connection delegate;
+
+ public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user)
+ throws IOException {
+ Configuration confForDelegate = new Configuration(conf);
+ confForDelegate.unset(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL);
+ delegate = createConnection(confForDelegate, es, user);
+
+ final String uuid = conf.get(UUID_KEY);
+ if (uuid != null) {
+ confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
+ }
+ }
+
+ static UUID configureConnectionImpl(Configuration conf) {
+ conf.setClass(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
+ ConfigurationCaptorConnection.class, Connection.class);
+
+ final UUID uuid = UUID.randomUUID();
+ conf.set(UUID_KEY, uuid.toString());
+ return uuid;
+ }
+
+ static List getCapturedConfigarutions(UUID key) {
+ return confs.get(key);
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return delegate.getConfiguration();
+ }
+
+ @Override
+ public Table getTable(TableName tableName) throws IOException {
+ return delegate.getTable(tableName);
+ }
+
+ @Override
+ public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
+ return delegate.getTable(tableName, pool);
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
+ return delegate.getBufferedMutator(tableName);
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
+ return delegate.getBufferedMutator(params);
+ }
+
+ @Override
+ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
+ return delegate.getRegionLocator(tableName);
+ }
+
+ @Override
+ public void clearRegionLocationCache() {
+ delegate.clearRegionLocationCache();
+ }
+
+ @Override
+ public Admin getAdmin() throws IOException {
+ return delegate.getAdmin();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return delegate.isClosed();
+ }
+
+ @Override
+ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
+ return delegate.getTableBuilder(tableName, pool);
+ }
+
+ @Override
+ public Hbck getHbck()
+ throws IOException {
+ return delegate.getHbck();
+ }
+
+ @Override
+ public Hbck getHbck(ServerName masterServer) throws IOException {
+ return delegate.getHbck(masterServer);
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ delegate.abort(why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return delegate.isAborted();
+ }
+ }
+
}