HBASE-14821 Allow configuration overrides in TableOutputFormat

This commit is contained in:
Gary Helmling 2015-11-18 10:25:54 -08:00 committed by Gary Helmling
parent 4cc341b9c2
commit 8b67df6948
4 changed files with 69 additions and 4 deletions

View File

@ -120,6 +120,27 @@ public class HBaseConfiguration extends Configuration {
} }
} }
/**
* Returns a subset of the configuration properties, matching the given key prefix.
* The prefix is stripped from the return keys, ie. when calling with a prefix of "myprefix",
* the entry "myprefix.key1 = value1" would be returned as "key1 = value1". If an entry's
* key matches the prefix exactly ("myprefix = value2"), it will <strong>not</strong> be
* included in the results, since it would show up as an entry with an empty key.
*/
public static Configuration subset(Configuration srcConf, String prefix) {
Configuration newConf = new Configuration(false);
for (Entry<String, String> entry : srcConf) {
if (entry.getKey().startsWith(prefix)) {
String newKey = entry.getKey().substring(prefix.length());
// avoid entries that would produce an empty key
if (!newKey.isEmpty()) {
newConf.set(newKey, entry.getValue());
}
}
}
return newConf;
}
/** /**
* @return whether to show HBase Configuration in servlet * @return whether to show HBase Configuration in servlet
*/ */

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
@ -64,6 +65,31 @@ public class TestHBaseConfiguration {
assertEquals(VAL, HBaseConfiguration.getInt(conf, NAME, DEPRECATED_NAME, 0)); assertEquals(VAL, HBaseConfiguration.getInt(conf, NAME, DEPRECATED_NAME, 0));
} }
@Test
public void testSubset() throws Exception {
Configuration conf = HBaseConfiguration.create();
// subset is used in TableMapReduceUtil#initCredentials to support different security
// configurations between source and destination clusters, so we'll use that as an example
String prefix = "hbase.mapred.output.";
conf.set("hbase.security.authentication", "kerberos");
conf.set("hbase.regionserver.kerberos.principal", "hbasesource");
conf.set(prefix + "hbase.regionserver.kerberos.principal", "hbasedest");
conf.set(prefix, "shouldbemissing");
Configuration subsetConf = HBaseConfiguration.subset(conf, prefix);
assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal"));
assertEquals("hbasedest", subsetConf.get("hbase.regionserver.kerberos.principal"));
assertNull(subsetConf.get("hbase.security.authentication"));
assertNull(subsetConf.get(""));
Configuration mergedConf = HBaseConfiguration.create(conf);
HBaseConfiguration.merge(mergedConf, subsetConf);
assertEquals("hbasedest", mergedConf.get("hbase.regionserver.kerberos.principal"));
assertEquals("kerberos", mergedConf.get("hbase.security.authentication"));
assertEquals("shouldbemissing", mergedConf.get(prefix));
}
@Test @Test
public void testGetPassword() throws Exception { public void testGetPassword() throws Exception {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();

View File

@ -487,6 +487,10 @@ public class TableMapReduceUtil {
if (quorumAddress != null) { if (quorumAddress != null) {
Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
// apply any "hbase.mapred.output." configuration overrides
Configuration outputOverrides =
HBaseConfiguration.subset(peerConf, TableOutputFormat.OUTPUT_CONF_PREFIX);
HBaseConfiguration.merge(peerConf, outputOverrides);
Connection peerConn = ConnectionFactory.createConnection(peerConf); Connection peerConn = ConnectionFactory.createConnection(peerConf);
try { try {
TokenUtil.addTokenForJob(peerConn, user, job); TokenUtil.addTokenForJob(peerConn, user, job);

View File

@ -57,23 +57,33 @@ implements Configurable {
/** Job parameter that specifies the output table. */ /** Job parameter that specifies the output table. */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
/**
* Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}.
* For keys matching this prefix, the prefix is stripped, and the value is set in the
* configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1"
* would be set in the configuration as "key1 = value1". Use this to set properties
* which should only be applied to the {@code TableOutputFormat} configuration and not the
* input configuration.
*/
public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
/** /**
* Optional job parameter to specify a peer cluster. * Optional job parameter to specify a peer cluster.
* Used specifying remote cluster when copying between hbase clusters (the * Used specifying remote cluster when copying between hbase clusters (the
* source is picked up from <code>hbase-site.xml</code>). * source is picked up from <code>hbase-site.xml</code>).
* @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String) * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)
*/ */
public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum"; public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
/** Optional job parameter to specify peer cluster's ZK client port */ /** Optional job parameter to specify peer cluster's ZK client port */
public static final String QUORUM_PORT = "hbase.mapred.output.quorum.port"; public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port";
/** Optional specification of the rs class name of the peer cluster */ /** Optional specification of the rs class name of the peer cluster */
public static final String public static final String
REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
/** Optional specification of the rs impl name of the peer cluster */ /** Optional specification of the rs impl name of the peer cluster */
public static final String public static final String
REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
/** The configuration. */ /** The configuration. */
private Configuration conf = null; private Configuration conf = null;
@ -211,5 +221,9 @@ implements Configurable {
LOG.error(e); LOG.error(e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// finally apply any remaining "hbase.mapred.output." configuration overrides
Configuration outputOverrides = HBaseConfiguration.subset(otherConf, OUTPUT_CONF_PREFIX);
HBaseConfiguration.merge(this.conf, outputOverrides);
} }
} }