HBASE-25672 Backport HBASE-25608 to branch-1 (#3068)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
f8078009e9
commit
716c685497
|
@ -125,6 +125,13 @@ public class HFileOutputFormat2
|
||||||
public static final String OUTPUT_TABLE_NAME_CONF_KEY =
|
public static final String OUTPUT_TABLE_NAME_CONF_KEY =
|
||||||
"hbase.mapreduce.hfileoutputformat.table.name";
|
"hbase.mapreduce.hfileoutputformat.table.name";
|
||||||
|
|
||||||
|
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
|
||||||
|
"hbase.hfileoutputformat.remote.cluster.zookeeper.quorum";
|
||||||
|
public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
|
||||||
|
"hbase.hfileoutputformat.remote.cluster.zookeeper." + HConstants.CLIENT_PORT_STR;
|
||||||
|
public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
|
||||||
|
"hbase.hfileoutputformat.remote.cluster." + HConstants.ZOOKEEPER_ZNODE_PARENT;
|
||||||
|
|
||||||
public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
|
public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
|
||||||
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
|
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
|
||||||
|
|
||||||
|
@ -223,7 +230,8 @@ public class HFileOutputFormat2
|
||||||
HRegionLocation loc = null;
|
HRegionLocation loc = null;
|
||||||
String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
|
String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
|
||||||
if (tableName != null) {
|
if (tableName != null) {
|
||||||
try (Connection connection = ConnectionFactory.createConnection(conf);
|
try (Connection connection = ConnectionFactory.createConnection(
|
||||||
|
createRemoteClusterConf(conf));
|
||||||
RegionLocator locator =
|
RegionLocator locator =
|
||||||
connection.getRegionLocator(TableName.valueOf(tableName))) {
|
connection.getRegionLocator(TableName.valueOf(tableName))) {
|
||||||
loc = locator.getRegionLocation(rowKey);
|
loc = locator.getRegionLocation(rowKey);
|
||||||
|
@ -286,6 +294,22 @@ public class HFileOutputFormat2
|
||||||
this.rollRequested = false;
|
this.rollRequested = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Configuration createRemoteClusterConf(Configuration conf) {
|
||||||
|
final Configuration newConf = new Configuration(conf);
|
||||||
|
|
||||||
|
final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
|
||||||
|
final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
|
||||||
|
final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
|
||||||
|
|
||||||
|
if (quorum != null && clientPort != null && parent != null) {
|
||||||
|
newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
|
||||||
|
newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
|
||||||
|
newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
|
||||||
|
}
|
||||||
|
|
||||||
|
return newConf;
|
||||||
|
}
|
||||||
|
|
||||||
/* Create a new StoreFile.Writer.
|
/* Create a new StoreFile.Writer.
|
||||||
* @param family
|
* @param family
|
||||||
* @return A WriterLength, containing a new StoreFile.Writer.
|
* @return A WriterLength, containing a new StoreFile.Writer.
|
||||||
|
@ -477,6 +501,7 @@ public class HFileOutputFormat2
|
||||||
* <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
|
* <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
|
||||||
* <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
|
* <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
|
||||||
* PutSortReducer)</li>
|
* PutSortReducer)</li>
|
||||||
|
* <li>Sets the HBase cluster key to load region locations for locality-sensitive</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
* The user should be sure to set the map output value class to either KeyValue or Put before
|
* The user should be sure to set the map output value class to either KeyValue or Put before
|
||||||
* running this function.
|
* running this function.
|
||||||
|
@ -484,6 +509,7 @@ public class HFileOutputFormat2
|
||||||
public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
|
public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
|
configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
|
||||||
|
configureRemoteCluster(job, table.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -574,6 +600,50 @@ public class HFileOutputFormat2
|
||||||
LOG.info("Incremental table " + table.getName() + " output configured.");
|
LOG.info("Incremental table " + table.getName() + " output configured.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure HBase cluster key for remote cluster to load region location for locality-sensitive
|
||||||
|
* if it's enabled.
|
||||||
|
* It's not necessary to call this method explicitly when the cluster key for HBase cluster to be
|
||||||
|
* used to load region location is configured in the job configuration.
|
||||||
|
* Call this method when another HBase cluster key is configured in the job configuration.
|
||||||
|
* For example, you should call when you load data from HBase cluster A using
|
||||||
|
* {@link TableInputFormat} and generate hfiles for HBase cluster B.
|
||||||
|
* Otherwise, HFileOutputFormat2 fetch location from cluster A and locality-sensitive won't
|
||||||
|
* working correctly.
|
||||||
|
* {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
|
||||||
|
* {@link Table#getConfiguration} as clusterConf.
|
||||||
|
* See HBASE-25608.
|
||||||
|
*
|
||||||
|
* @param job which has configuration to be updated
|
||||||
|
* @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
|
||||||
|
*
|
||||||
|
* @see #configureIncrementalLoad(Job, Table, RegionLocator)
|
||||||
|
* @see #LOCALITY_SENSITIVE_CONF_KEY
|
||||||
|
* @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
|
||||||
|
* @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
|
||||||
|
* @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
|
||||||
|
*/
|
||||||
|
public static void configureRemoteCluster(Job job, Configuration clusterConf) {
|
||||||
|
Configuration conf = job.getConfiguration();
|
||||||
|
|
||||||
|
if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
|
||||||
|
final int clientPort = clusterConf.getInt(
|
||||||
|
HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
|
||||||
|
final String parent = clusterConf.get(
|
||||||
|
HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||||
|
|
||||||
|
conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
|
||||||
|
conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
|
||||||
|
conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
|
||||||
|
|
||||||
|
LOG.info("ZK configs for remote cluster of bulkload is configured: " +
|
||||||
|
quorum + ":" + clientPort + "/" + parent);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs inside the task to deserialize column family to compression algorithm
|
* Runs inside the task to deserialize column family to compression algorithm
|
||||||
* map from the configuration.
|
* map from the configuration.
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionFactory.createConnection;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
@ -35,7 +36,11 @@ import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -62,8 +67,10 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.Tag;
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.TagType;
|
import org.apache.hadoop.hbase.TagType;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||||
|
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
|
@ -78,17 +85,16 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver;
|
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
@ -103,8 +109,6 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestRule;
|
import org.junit.rules.TestRule;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
|
* Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
|
||||||
* Sets up and runs a mapreduce job that writes hfile output.
|
* Sets up and runs a mapreduce job that writes hfile output.
|
||||||
|
@ -1097,7 +1101,7 @@ public class TestHFileOutputFormat2 {
|
||||||
generateRandomStartKeys(5);
|
generateRandomStartKeys(5);
|
||||||
util.setJobWithoutMRCluster();
|
util.setJobWithoutMRCluster();
|
||||||
util.startMiniCluster();
|
util.startMiniCluster();
|
||||||
try (Connection conn = ConnectionFactory.createConnection();
|
try (Connection conn = createConnection();
|
||||||
Admin admin = conn.getAdmin()) {
|
Admin admin = conn.getAdmin()) {
|
||||||
final FileSystem fs = util.getDFSCluster().getFileSystem();
|
final FileSystem fs = util.getDFSCluster().getFileSystem();
|
||||||
HTable table = util.createTable(TABLE_NAME, FAMILIES);
|
HTable table = util.createTable(TABLE_NAME, FAMILIES);
|
||||||
|
@ -1177,7 +1181,7 @@ public class TestHFileOutputFormat2 {
|
||||||
generateRandomStartKeys(5);
|
generateRandomStartKeys(5);
|
||||||
util.setJobWithoutMRCluster();
|
util.setJobWithoutMRCluster();
|
||||||
util.startMiniCluster();
|
util.startMiniCluster();
|
||||||
try (Connection conn = ConnectionFactory.createConnection(conf);
|
try (Connection conn = createConnection(conf);
|
||||||
Admin admin = conn.getAdmin()){
|
Admin admin = conn.getAdmin()){
|
||||||
Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
|
Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
|
||||||
final FileSystem fs = util.getDFSCluster().getFileSystem();
|
final FileSystem fs = util.getDFSCluster().getFileSystem();
|
||||||
|
@ -1276,7 +1280,7 @@ public class TestHFileOutputFormat2 {
|
||||||
}
|
}
|
||||||
} else if ("incremental".equals(args[0])) {
|
} else if ("incremental".equals(args[0])) {
|
||||||
TableName tname = TableName.valueOf(args[1]);
|
TableName tname = TableName.valueOf(args[1]);
|
||||||
try(Connection c = ConnectionFactory.createConnection(conf);
|
try(Connection c = createConnection(conf);
|
||||||
Admin admin = c.getAdmin();
|
Admin admin = c.getAdmin();
|
||||||
RegionLocator regionLocator = c.getRegionLocator(tname)) {
|
RegionLocator regionLocator = c.getRegionLocator(tname)) {
|
||||||
Path outDir = new Path("incremental-out");
|
Path outDir = new Path("incremental-out");
|
||||||
|
@ -1330,5 +1334,173 @@ public class TestHFileOutputFormat2 {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
|
||||||
|
// Start cluster A
|
||||||
|
util = new HBaseTestingUtility();
|
||||||
|
Configuration confA = util.getConfiguration();
|
||||||
|
int hostCount = 3;
|
||||||
|
int regionNum = 20;
|
||||||
|
String[] hostnames = new String[hostCount];
|
||||||
|
for (int i = 0; i < hostCount; ++i) {
|
||||||
|
hostnames[i] = "datanode_" + i;
|
||||||
|
}
|
||||||
|
util.setJobWithoutMRCluster();
|
||||||
|
util.startMiniCluster(1, hostCount, hostnames);
|
||||||
|
|
||||||
|
// Start cluster B
|
||||||
|
HBaseTestingUtility utilB = new HBaseTestingUtility();
|
||||||
|
Configuration confB = utilB.getConfiguration();
|
||||||
|
utilB.startMiniCluster(1, hostCount, hostnames);
|
||||||
|
|
||||||
|
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
|
||||||
|
|
||||||
|
byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
|
||||||
|
TableName tableName = TableName.valueOf("table");
|
||||||
|
// Create table in cluster B
|
||||||
|
try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
|
||||||
|
RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
|
||||||
|
// Generate the bulk load files
|
||||||
|
// Job has zookeeper configuration for cluster A
|
||||||
|
// Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
|
||||||
|
Job job = new Job(confA, "testLocalMRIncrementalLoad");
|
||||||
|
Configuration jobConf = job.getConfiguration();
|
||||||
|
final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
|
||||||
|
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
|
||||||
|
setupRandomGeneratorMapper(job, false);
|
||||||
|
HFileOutputFormat2.configureIncrementalLoad(job, table, r);
|
||||||
|
|
||||||
|
assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
|
||||||
|
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
|
||||||
|
assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
|
||||||
|
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
|
||||||
|
assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
|
||||||
|
jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
|
||||||
|
|
||||||
|
FileOutputFormat.setOutputPath(job, testDir);
|
||||||
|
|
||||||
|
assertFalse(util.getTestFileSystem().exists(testDir));
|
||||||
|
|
||||||
|
assertTrue(job.waitForCompletion(true));
|
||||||
|
|
||||||
|
final List<Configuration> configs =
|
||||||
|
ConfigurationCaptorConnection.getCapturedConfigarutions(key);
|
||||||
|
|
||||||
|
assertFalse(configs.isEmpty());
|
||||||
|
for (Configuration config : configs) {
|
||||||
|
assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
|
||||||
|
config.get(HConstants.ZOOKEEPER_QUORUM));
|
||||||
|
assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
|
||||||
|
config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
|
||||||
|
assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
|
||||||
|
config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
utilB.deleteTable(tableName);
|
||||||
|
testDir.getFileSystem(confA).delete(testDir, true);
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
utilB.shutdownMiniCluster();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class ConfigurationCaptorConnection implements Connection {
|
||||||
|
private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
|
||||||
|
|
||||||
|
private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final Connection delegate;
|
||||||
|
|
||||||
|
public ConfigurationCaptorConnection(
|
||||||
|
Configuration conf, boolean managed, ExecutorService es, User user)
|
||||||
|
throws IOException {
|
||||||
|
Configuration confForDelegate = new Configuration(conf);
|
||||||
|
confForDelegate.unset(HConnection.HBASE_CLIENT_CONNECTION_IMPL);
|
||||||
|
delegate = createConnection(confForDelegate, es, user);
|
||||||
|
|
||||||
|
final String uuid = conf.get(UUID_KEY);
|
||||||
|
if (uuid != null) {
|
||||||
|
final UUID key = UUID.fromString(uuid);
|
||||||
|
List<Configuration> configurations = confs.get(key);
|
||||||
|
if (configurations == null) {
|
||||||
|
configurations = new CopyOnWriteArrayList<>();
|
||||||
|
confs.put(key, configurations);
|
||||||
|
}
|
||||||
|
configurations.add(conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static UUID configureConnectionImpl(Configuration conf) {
|
||||||
|
conf.setClass(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
|
||||||
|
ConfigurationCaptorConnection.class, Connection.class);
|
||||||
|
|
||||||
|
final UUID uuid = UUID.randomUUID();
|
||||||
|
conf.set(UUID_KEY, uuid.toString());
|
||||||
|
return uuid;
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<Configuration> getCapturedConfigarutions(UUID key) {
|
||||||
|
return confs.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return delegate.getConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Table getTable(TableName tableName) throws IOException {
|
||||||
|
return delegate.getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
|
||||||
|
return delegate.getTable(tableName, pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
|
||||||
|
return delegate.getBufferedMutator(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BufferedMutator getBufferedMutator(BufferedMutatorParams params)
|
||||||
|
throws IOException {
|
||||||
|
return delegate.getBufferedMutator(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
|
||||||
|
return delegate.getRegionLocator(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Admin getAdmin() throws IOException {
|
||||||
|
return delegate.getAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getClusterId() throws IOException {
|
||||||
|
return delegate.getClusterId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
delegate.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isClosed() {
|
||||||
|
return delegate.isClosed();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
delegate.abort(why, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return delegate.isAborted();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue