Fix hadoop ingestion property handling when using indexers (#9059)

This commit is contained in:
Jonathan Wei 2019-12-18 12:13:19 -08:00 committed by GitHub
parent b1547a76b1
commit 15884f6d10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 6 deletions

View File

@ -75,6 +75,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
@ -95,7 +96,15 @@ public class HadoopDruidIndexerConfig
static final DataSegmentPusher DATA_SEGMENT_PUSHER;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
/**
* Hadoop tasks running in an Indexer process need a reference to the Properties instance created
* in PropertiesModule so that the task sees properties that were specified in Druid's config files.
*
* This is not strictly necessary for Peon-based tasks which have all properties, including config file properties,
* specified on their command line by ForkingTaskRunner (so they could use System.getProperties() only),
* but we always use the injected Properties for consistency.
*/
public static final Properties PROPERTIES;
static {
INJECTOR = Initialization.makeInjectorWithModules(
@ -117,6 +126,7 @@ public class HadoopDruidIndexerConfig
INDEX_MERGER_V9 = INJECTOR.getInstance(IndexMergerV9.class);
HADOOP_KERBEROS_CONFIG = INJECTOR.getInstance(HadoopKerberosConfig.class);
DATA_SEGMENT_PUSHER = INJECTOR.getInstance(DataSegmentPusher.class);
PROPERTIES = INJECTOR.getInstance(Properties.class);
}
public enum IndexJobCounters

View File

@ -315,11 +315,21 @@ public class JobHelper
String mapJavaOpts = StringUtils.nullToEmptyNonDruidDataString(configuration.get(MRJobConfig.MAP_JAVA_OPTS));
String reduceJavaOpts = StringUtils.nullToEmptyNonDruidDataString(configuration.get(MRJobConfig.REDUCE_JAVA_OPTS));
for (String propName : System.getProperties().stringPropertyNames()) {
for (String propName : HadoopDruidIndexerConfig.PROPERTIES.stringPropertyNames()) {
for (String prefix : listOfAllowedPrefix) {
if (propName.equals(prefix) || propName.startsWith(prefix + ".")) {
mapJavaOpts = StringUtils.format("%s -D%s=%s", mapJavaOpts, propName, System.getProperty(propName));
reduceJavaOpts = StringUtils.format("%s -D%s=%s", reduceJavaOpts, propName, System.getProperty(propName));
mapJavaOpts = StringUtils.format(
"%s -D%s=%s",
mapJavaOpts,
propName,
HadoopDruidIndexerConfig.PROPERTIES.getProperty(propName)
);
reduceJavaOpts = StringUtils.format(
"%s -D%s=%s",
reduceJavaOpts,
propName,
HadoopDruidIndexerConfig.PROPERTIES.getProperty(propName)
);
break;
}
}
@ -335,9 +345,9 @@ public class JobHelper
public static Configuration injectSystemProperties(Configuration conf)
{
for (String propName : System.getProperties().stringPropertyNames()) {
for (String propName : HadoopDruidIndexerConfig.PROPERTIES.stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
conf.set(propName.substring("hadoop.".length()), HadoopDruidIndexerConfig.PROPERTIES.getProperty(propName));
}
}
return conf;