HBASE-25608 Support HFileOutputFormat locality sensitive even destination cluster is different from source cluster (#2988)

Signed-off-by: stack <stack@duboce.net>
This commit is contained in:
bitterfox 2021-03-17 13:47:36 +09:00 committed by GitHub
parent bcf503e6c2
commit 976629c046
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 272 additions and 1 deletions

View File

@ -163,6 +163,13 @@ public class HFileOutputFormat2
static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
"hbase.mapreduce.use.multi.table.hfileoutputformat"; "hbase.mapreduce.use.multi.table.hfileoutputformat";
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
"hbase.hfileoutputformat.remote.cluster.zookeeper.quorum";
public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
"hbase.hfileoutputformat.remote.cluster.zookeeper." + HConstants.CLIENT_PORT_STR;
public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
"hbase.hfileoutputformat.remote.cluster." + HConstants.ZOOKEEPER_ZNODE_PARENT;
public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
@ -274,7 +281,8 @@ public class HFileOutputFormat2
HRegionLocation loc = null; HRegionLocation loc = null;
String tableName = Bytes.toString(tableNameBytes); String tableName = Bytes.toString(tableNameBytes);
if (tableName != null) { if (tableName != null) {
try (Connection connection = ConnectionFactory.createConnection(conf); try (Connection connection = ConnectionFactory.createConnection(
createRemoteClusterConf(conf));
RegionLocator locator = RegionLocator locator =
connection.getRegionLocator(TableName.valueOf(tableName))) { connection.getRegionLocator(TableName.valueOf(tableName))) {
loc = locator.getRegionLocation(rowKey); loc = locator.getRegionLocation(rowKey);
@ -340,6 +348,22 @@ public class HFileOutputFormat2
wl.written = 0; wl.written = 0;
} }
private Configuration createRemoteClusterConf(Configuration conf) {
final Configuration newConf = new Configuration(conf);
final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
if (quorum != null && clientPort != null && parent != null) {
newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
}
return newConf;
}
/* /*
* Create a new StoreFile.Writer. * Create a new StoreFile.Writer.
* @return A WriterLength, containing a new StoreFile.Writer. * @return A WriterLength, containing a new StoreFile.Writer.
@ -518,6 +542,7 @@ public class HFileOutputFormat2
* <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
* <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
* PutSortReducer)</li> * PutSortReducer)</li>
* <li>Sets the HBase cluster key to load region locations for locality-sensitive</li>
* </ul> * </ul>
* The user should be sure to set the map output value class to either KeyValue or Put before * The user should be sure to set the map output value class to either KeyValue or Put before
* running this function. * running this function.
@ -525,6 +550,7 @@ public class HFileOutputFormat2
public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
throws IOException { throws IOException {
configureIncrementalLoad(job, table.getDescriptor(), regionLocator); configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
configureRemoteCluster(job, table.getConfiguration());
} }
/** /**
@ -657,6 +683,50 @@ public class HFileOutputFormat2
LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
} }
/**
* Configure HBase cluster key for remote cluster to load region location for locality-sensitive
* if it's enabled.
* It's not necessary to call this method explicitly when the cluster key for HBase cluster to be
* used to load region location is configured in the job configuration.
* Call this method when another HBase cluster key is configured in the job configuration.
* For example, you should call when you load data from HBase cluster A using
* {@link TableInputFormat} and generate hfiles for HBase cluster B.
* Otherwise, HFileOutputFormat2 fetch location from cluster A and locality-sensitive won't
* working correctly.
* {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
* {@link Table#getConfiguration} as clusterConf.
* See HBASE-25608.
*
* @param job which has configuration to be updated
* @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
*
* @see #configureIncrementalLoad(Job, Table, RegionLocator)
* @see #LOCALITY_SENSITIVE_CONF_KEY
* @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
* @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
* @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
*/
public static void configureRemoteCluster(Job job, Configuration clusterConf) {
Configuration conf = job.getConfiguration();
if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
return;
}
final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
final int clientPort = clusterConf.getInt(
HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
final String parent = clusterConf.get(
HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
LOG.info("ZK configs for remote cluster of bulkload is configured: " +
quorum + ":" + clientPort + "/" + parent);
}
/** /**
* Runs inside the task to deserialize column family to compression algorithm * Runs inside the task to deserialize column family to compression algorithm
* map from the configuration. * map from the configuration.

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import static org.apache.hadoop.hbase.client.ConnectionFactory.createAsyncConnection;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -38,7 +39,11 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -60,21 +65,28 @@ import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Hbck;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@ -90,12 +102,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@ -1635,5 +1649,192 @@ public class TestHFileOutputFormat2 {
} }
@Test
public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
// Start cluster A
util = new HBaseTestingUtility();
Configuration confA = util.getConfiguration();
int hostCount = 3;
int regionNum = 20;
String[] hostnames = new String[hostCount];
for (int i = 0; i < hostCount; ++i) {
hostnames[i] = "datanode_" + i;
}
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numRegionServers(hostCount).dataNodeHosts(hostnames).build();
util.startMiniCluster(option);
// Start cluster B
HBaseTestingUtility utilB = new HBaseTestingUtility();
Configuration confB = utilB.getConfiguration();
utilB.startMiniCluster(option);
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
TableName tableName = TableName.valueOf("table");
// Create table in cluster B
try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
// Generate the bulk load files
// Job has zookeeper configuration for cluster A
// Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
Job job = new Job(confA, "testLocalMRIncrementalLoad");
Configuration jobConf = job.getConfiguration();
final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
setupRandomGeneratorMapper(job, false);
HFileOutputFormat2.configureIncrementalLoad(job, table, r);
assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
FileOutputFormat.setOutputPath(job, testDir);
assertFalse(util.getTestFileSystem().exists(testDir));
assertTrue(job.waitForCompletion(true));
final List<Configuration> configs =
ConfigurationCaptorConnection.getCapturedConfigarutions(key);
assertFalse(configs.isEmpty());
for (Configuration config : configs) {
assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
config.get(HConstants.ZOOKEEPER_QUORUM));
assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
}
} finally {
utilB.deleteTable(tableName);
testDir.getFileSystem(confA).delete(testDir, true);
util.shutdownMiniCluster();
utilB.shutdownMiniCluster();
}
}
private static class ConfigurationCaptorConnection implements Connection {
private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
private final Connection delegate;
public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user)
throws IOException {
delegate = FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
final String uuid = conf.get(UUID_KEY);
if (uuid != null) {
confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
}
}
static UUID configureConnectionImpl(Configuration conf) {
conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
ConfigurationCaptorConnection.class, Connection.class);
final UUID uuid = UUID.randomUUID();
conf.set(UUID_KEY, uuid.toString());
return uuid;
}
static List<Configuration> getCapturedConfigarutions(UUID key) {
return confs.get(key);
}
@Override
public Configuration getConfiguration() {
return delegate.getConfiguration();
}
@Override
public Table getTable(TableName tableName) throws IOException {
return delegate.getTable(tableName);
}
@Override
public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
return delegate.getTable(tableName, pool);
}
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
return delegate.getBufferedMutator(tableName);
}
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
return delegate.getBufferedMutator(params);
}
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return delegate.getRegionLocator(tableName);
}
@Override
public void clearRegionLocationCache() {
delegate.clearRegionLocationCache();
}
@Override
public Admin getAdmin() throws IOException {
return delegate.getAdmin();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public boolean isClosed() {
return delegate.isClosed();
}
@Override
public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
return delegate.getTableBuilder(tableName, pool);
}
@Override
public AsyncConnection toAsyncConnection() {
return delegate.toAsyncConnection();
}
@Override
public String getClusterId() {
return delegate.getClusterId();
}
@Override
public Hbck getHbck()
throws IOException {
return delegate.getHbck();
}
@Override
public Hbck getHbck(ServerName masterServer) throws IOException {
return delegate.getHbck(masterServer);
}
@Override
public void abort(String why, Throwable e) {
delegate.abort(why, e);
}
@Override
public boolean isAborted() {
return delegate.isAborted();
}
}
} }