From 07b07300b86385e33f3ab83c2930886935ad882c Mon Sep 17 00:00:00 2001 From: Gary Helmling Date: Wed, 18 Nov 2015 10:25:54 -0800 Subject: [PATCH] HBASE-14821 Allow configuration overrides in TableOutputFormat --- .../hadoop/hbase/HBaseConfiguration.java | 21 +++++++++++++++ .../hadoop/hbase/TestHBaseConfiguration.java | 26 +++++++++++++++++++ .../hbase/mapreduce/TableMapReduceUtil.java | 4 +++ .../hbase/mapreduce/TableOutputFormat.java | 22 +++++++++++++--- 4 files changed, 69 insertions(+), 4 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java index f2fc55ae1fb..505912ee6dd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java @@ -118,6 +118,27 @@ public class HBaseConfiguration extends Configuration { } } + /** + * Returns a subset of the configuration properties, matching the given key prefix. + * The prefix is stripped from the return keys, ie. when calling with a prefix of "myprefix", + * the entry "myprefix.key1 = value1" would be returned as "key1 = value1". If an entry's + * key matches the prefix exactly ("myprefix = value2"), it will not be + * included in the results, since it would show up as an entry with an empty key. + */ + public static Configuration subset(Configuration srcConf, String prefix) { + Configuration newConf = new Configuration(false); + for (Entry entry : srcConf) { + if (entry.getKey().startsWith(prefix)) { + String newKey = entry.getKey().substring(prefix.length()); + // avoid entries that would produce an empty key + if (!newKey.isEmpty()) { + newConf.set(newKey, entry.getValue()); + } + } + } + return newConf; + } + /** * @return whether to show HBase Configuration in servlet */ diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java index b739f364b62..bbddb602ab8 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import java.io.IOException; @@ -63,6 +64,31 @@ public class TestHBaseConfiguration { assertEquals(VAL, HBaseConfiguration.getInt(conf, NAME, DEPRECATED_NAME, 0)); } + @Test + public void testSubset() throws Exception { + Configuration conf = HBaseConfiguration.create(); + // subset is used in TableMapReduceUtil#initCredentials to support different security + // configurations between source and destination clusters, so we'll use that as an example + String prefix = "hbase.mapred.output."; + conf.set("hbase.security.authentication", "kerberos"); + conf.set("hbase.regionserver.kerberos.principal", "hbasesource"); + conf.set(prefix + "hbase.regionserver.kerberos.principal", "hbasedest"); + conf.set(prefix, "shouldbemissing"); + + Configuration subsetConf = HBaseConfiguration.subset(conf, prefix); + assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal")); + assertEquals("hbasedest", subsetConf.get("hbase.regionserver.kerberos.principal")); + assertNull(subsetConf.get("hbase.security.authentication")); + assertNull(subsetConf.get("")); + + Configuration mergedConf = HBaseConfiguration.create(conf); + HBaseConfiguration.merge(mergedConf, subsetConf); + + assertEquals("hbasedest", mergedConf.get("hbase.regionserver.kerberos.principal")); + assertEquals("kerberos", mergedConf.get("hbase.security.authentication")); + assertEquals("shouldbemissing", mergedConf.get(prefix)); + } + @Test public void testGetPassword() throws Exception { Configuration conf = HBaseConfiguration.create(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 5e94cca8de8..fdd68ce5d5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -477,6 +477,10 @@ public class TableMapReduceUtil { if (quorumAddress != null) { Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); + // apply any "hbase.mapred.output." configuration overrides + Configuration outputOverrides = + HBaseConfiguration.subset(peerConf, TableOutputFormat.OUTPUT_CONF_PREFIX); + HBaseConfiguration.merge(peerConf, outputOverrides); Connection peerConn = ConnectionFactory.createConnection(peerConf); try { TokenUtil.addTokenForJob(peerConn, user, job); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 7b2307524eb..190962ecf80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -58,23 +58,33 @@ implements Configurable { /** Job parameter that specifies the output table. */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; + /** + * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. + * For keys matching this prefix, the prefix is stripped, and the value is set in the + * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1" + * would be set in the configuration as "key1 = value1". Use this to set properties + * which should only be applied to the {@code TableOutputFormat} configuration and not the + * input configuration. + */ + public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output."; + /** * Optional job parameter to specify a peer cluster. * Used specifying remote cluster when copying between hbase clusters (the * source is picked up from hbase-site.xml). * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String) */ - public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum"; + public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum"; /** Optional job parameter to specify peer cluster's ZK client port */ - public static final String QUORUM_PORT = "hbase.mapred.output.quorum.port"; + public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port"; /** Optional specification of the rs class name of the peer cluster */ public static final String - REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; + REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class"; /** Optional specification of the rs impl name of the peer cluster */ public static final String - REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; + REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl"; /** The configuration. */ private Configuration conf = null; @@ -207,5 +217,9 @@ implements Configurable { LOG.error(e); throw new RuntimeException(e); } + + // finally apply any remaining "hbase.mapred.output." configuration overrides + Configuration outputOverrides = HBaseConfiguration.subset(otherConf, OUTPUT_CONF_PREFIX); + HBaseConfiguration.merge(this.conf, outputOverrides); } }