fix Hadoop ingestion fails due to error 'JavaScript is disabled' on certain config (#9553)

* fix Hadoop ingestion fails due to error 'JavaScript is disabled', if determine partition hadoop job is run

* add test

* fix checkstyle

* address comments

* address comments
This commit is contained in:
Maytas Monsereenusorn 2020-03-23 23:09:21 -07:00 committed by GitHub
parent 57018adf23
commit e97695d9da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 120 additions and 41 deletions

View File

@ -96,7 +96,7 @@ public class DetermineHashedPartitionsJob implements Jobby
StringUtils.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals())
);
JobHelper.injectSystemProperties(groupByJob);
JobHelper.injectSystemProperties(groupByJob.getConfiguration(), config);
config.addJobProperties(groupByJob);
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);

View File

@ -135,7 +135,7 @@ public class DeterminePartitionsJob implements Jobby
StringUtils.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
);
JobHelper.injectSystemProperties(groupByJob);
JobHelper.injectSystemProperties(groupByJob.getConfiguration(), config);
config.addJobProperties(groupByJob);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
@ -191,7 +191,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19");
JobHelper.injectSystemProperties(dimSelectionJob);
JobHelper.injectSystemProperties(dimSelectionJob.getConfiguration(), config);
config.addJobProperties(dimSelectionJob);
if (!partitionsSpec.isAssumeGrouped()) {

View File

@ -379,12 +379,25 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getMaxParseExceptions();
}
public Map<String, String> getAllowedProperties()
{
Map<String, String> allowedPropertiesMap = new HashMap<>();
for (String propName : PROPERTIES.stringPropertyNames()) {
for (String prefix : allowedHadoopPrefix) {
if (propName.equals(prefix) || propName.startsWith(prefix + ".")) {
allowedPropertiesMap.put(propName, PROPERTIES.getProperty(propName));
break;
}
}
}
return allowedPropertiesMap;
}
boolean isUseYarnRMJobStatusFallback()
{
return schema.getTuningConfig().isUseYarnRMJobStatusFallback();
}
void setHadoopJobIdFileName(String hadoopJobIdFileName)
{
this.hadoopJobIdFileName = hadoopJobIdFileName;
@ -597,9 +610,4 @@ public class HadoopDruidIndexerConfig
Preconditions.checkNotNull(schema.getIOConfig().getSegmentOutputPath(), "segmentOutputPath");
Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version");
}
List<String> getAllowedHadoopPrefix()
{
return allowedHadoopPrefix;
}
}

View File

@ -105,7 +105,7 @@ public class IndexGeneratorJob implements Jobby
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{
final Configuration conf = JobHelper.injectSystemProperties(new Configuration());
final Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config);
config.addJobProperties(conf);
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER;
@ -167,10 +167,10 @@ public class IndexGeneratorJob implements Jobby
job.getConfiguration().set("io.sort.record.percent", "0.23");
JobHelper.injectSystemProperties(job);
JobHelper.injectSystemProperties(job.getConfiguration(), config);
config.addJobProperties(job);
// inject druid properties like deep storage bindings
JobHelper.injectDruidProperties(job.getConfiguration(), config.getAllowedHadoopPrefix());
JobHelper.injectDruidProperties(job.getConfiguration(), config);
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class);

View File

@ -305,36 +305,26 @@ public class JobHelper
return SNAPSHOT_JAR.matcher(jarFile.getName()).matches();
}
public static void injectSystemProperties(Job job)
{
injectSystemProperties(job.getConfiguration());
}
public static void injectDruidProperties(Configuration configuration, List<String> listOfAllowedPrefix)
public static void injectDruidProperties(Configuration configuration, HadoopDruidIndexerConfig hadoopDruidIndexerConfig)
{
String mapJavaOpts = StringUtils.nullToEmptyNonDruidDataString(configuration.get(MRJobConfig.MAP_JAVA_OPTS));
String reduceJavaOpts = StringUtils.nullToEmptyNonDruidDataString(configuration.get(MRJobConfig.REDUCE_JAVA_OPTS));
for (String propName : HadoopDruidIndexerConfig.PROPERTIES.stringPropertyNames()) {
for (String prefix : listOfAllowedPrefix) {
if (propName.equals(prefix) || propName.startsWith(prefix + ".")) {
mapJavaOpts = StringUtils.format(
"%s -D%s=%s",
mapJavaOpts,
propName,
HadoopDruidIndexerConfig.PROPERTIES.getProperty(propName)
);
reduceJavaOpts = StringUtils.format(
"%s -D%s=%s",
reduceJavaOpts,
propName,
HadoopDruidIndexerConfig.PROPERTIES.getProperty(propName)
);
break;
}
}
for (Map.Entry<String, String> allowedProperties : hadoopDruidIndexerConfig.getAllowedProperties().entrySet()) {
mapJavaOpts = StringUtils.format(
"%s -D%s=%s",
mapJavaOpts,
allowedProperties.getKey(),
allowedProperties.getValue()
);
reduceJavaOpts = StringUtils.format(
"%s -D%s=%s",
reduceJavaOpts,
allowedProperties.getKey(),
allowedProperties.getValue()
);
}
if (!Strings.isNullOrEmpty(mapJavaOpts)) {
configuration.set(MRJobConfig.MAP_JAVA_OPTS, mapJavaOpts);
}
@ -343,13 +333,18 @@ public class JobHelper
}
}
public static Configuration injectSystemProperties(Configuration conf)
public static Configuration injectSystemProperties(Configuration conf, HadoopDruidIndexerConfig hadoopDruidIndexerConfig)
{
for (String propName : HadoopDruidIndexerConfig.PROPERTIES.stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), HadoopDruidIndexerConfig.PROPERTIES.getProperty(propName));
}
}
for (Map.Entry<String, String> allowedProperties : hadoopDruidIndexerConfig.getAllowedProperties().entrySet()) {
conf.set(allowedProperties.getKey(), allowedProperties.getValue());
}
return conf;
}
@ -364,7 +359,7 @@ public class JobHelper
);
job.getConfiguration().set("io.sort.record.percent", "0.19");
injectSystemProperties(job);
injectSystemProperties(job.getConfiguration(), config);
config.addJobProperties(job);
config.addInputPaths(job);
@ -401,7 +396,7 @@ public class JobHelper
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
Configuration conf = injectSystemProperties(new Configuration());
Configuration conf = injectSystemProperties(new Configuration(), config);
config.addJobProperties(conf);
workingPath.getFileSystem(conf).delete(workingPath, true);
}
@ -429,7 +424,7 @@ public class JobHelper
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
Configuration conf = injectSystemProperties(new Configuration());
Configuration conf = injectSystemProperties(new Configuration(), config);
config.addJobProperties(conf);
workingPath.getFileSystem(conf).delete(workingPath, true);
}

View File

@ -19,18 +19,25 @@
package org.apache.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.path.StaticPathSpec;
import org.apache.druid.java.util.common.CompressionUtilsTest;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
@ -41,6 +48,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Progressable;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -59,6 +67,47 @@ import java.util.Map;
*/
public class JobHelperTest
{
private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
JSON_MAPPER.convertValue(
new HadoopyStringInputRowParser(
new JSONParseSpec(
new TimestampSpec("t", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim1t", "dim2")),
null,
null
),
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of()
)
),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null,
JSON_MAPPER
);
private static final HadoopIOConfig IO_CONFIG = new HadoopIOConfig(
JSON_MAPPER.convertValue(
new StaticPathSpec("dummyPath", null),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
),
null,
"dummyOutputPath"
);
private static final HadoopTuningConfig TUNING_CONFIG = HadoopTuningConfig
.makeDefaultTuningConfig()
.withWorkingPath("dummyWorkingPath");
private static final HadoopIngestionSpec DUMMY_SPEC = new HadoopIngestionSpec(DATA_SCHEMA, IO_CONFIG, TUNING_CONFIG);
private static final String VALID_DRUID_PROP = "druid.javascript.enableTest";
private static final String VALID_HADOOP_PREFIX = "hadoop.";
private static final String VALID_HADOOP_PROP = "test.enableTest";
private static final String INVALID_PROP = "invalid.test.enableTest";
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -140,8 +189,20 @@ public class JobHelperTest
)
)
);
HadoopDruidIndexerConfig.PROPERTIES.setProperty(VALID_DRUID_PROP, "true");
HadoopDruidIndexerConfig.PROPERTIES.setProperty(VALID_HADOOP_PREFIX + VALID_HADOOP_PROP, "true");
HadoopDruidIndexerConfig.PROPERTIES.setProperty(INVALID_PROP, "true");
}
@After
public void teardown()
{
HadoopDruidIndexerConfig.PROPERTIES.remove(VALID_DRUID_PROP);
HadoopDruidIndexerConfig.PROPERTIES.remove(VALID_HADOOP_PREFIX + VALID_HADOOP_PROP);
HadoopDruidIndexerConfig.PROPERTIES.remove(INVALID_PROP);
}
@Test
public void testEnsurePathsAddsProperties()
{
@ -160,6 +221,21 @@ public class JobHelperTest
);
}
@Test
public void testInjectSystemProperties()
{
HadoopDruidIndexerConfig hadoopDruidIndexerConfig = new HadoopDruidIndexerConfig(DUMMY_SPEC);
Configuration config = new Configuration();
JobHelper.injectSystemProperties(config, hadoopDruidIndexerConfig);
// This should be injected
Assert.assertNotNull(config.get(VALID_DRUID_PROP));
// This should be injected
Assert.assertNotNull(config.get(VALID_HADOOP_PROP));
// This should not be injected
Assert.assertNull(config.get(INVALID_PROP));
}
@Test
public void testGoogleGetURIFromSegment() throws URISyntaxException
{