From 6ad451a44ae16b70eef7d9bdbe264db46b6ddbc0 Mon Sep 17 00:00:00 2001 From: Michael Schiff Date: Fri, 26 Jun 2015 00:57:22 -0700 Subject: [PATCH] JobHelper.ensurePaths will set job properties from config (tuningConfig.jobProperties) before adding input paths to the config. Adding input paths will create Path and FileSystem instances which may depend on the values in the job config. This allows all properties to be set from the spec file, avoiding having to directly edit cluster xml files. IndexGeneratorJob.run adds job properties before adding input paths (adding input paths may depend on having job properies set) JobHelperTest confirms that JobHelper.ensurePaths adds job properties javadoc for addInputPaths to explain relationship with addJobProperties --- .../indexer/HadoopDruidIndexerConfig.java | 8 + .../io/druid/indexer/IndexGeneratorJob.java | 2 +- .../main/java/io/druid/indexer/JobHelper.java | 1 + .../java/io/druid/indexer/JobHelperTest.java | 164 ++++++++++++++++++ 4 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 25672e85cad..6aeaa6430ab 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -316,6 +316,14 @@ public class HadoopDruidIndexerConfig return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum); } + /** + * Job instance should have Configuration set (by calling {@link #addJobProperties(Job)} + * or via injected system properties) before this method is called. The {@link PathSpec} may + * create objects which depend on the values of these configurations. + * @param job + * @return + * @throws IOException + */ public Job addInputPaths(Job job) throws IOException { return pathSpec.addInputPaths(this, job); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 6f87db20235..f69af4aa27e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -158,8 +158,8 @@ public class IndexGeneratorJob implements Jobby job.setOutputFormatClass(IndexGeneratorOutputFormat.class); FileOutputFormat.setOutputPath(job, config.makeIntermediatePath()); - config.addInputPaths(job); config.addJobProperties(job); + config.addInputPaths(job); // hack to get druid.processing.bitmap property passed down to hadoop job. // once IndexIO doesn't rely on globally injected properties, we can move this into the HadoopTuningConfig. diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 19c743b8240..d9a3de92ad6 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -163,6 +163,7 @@ public class JobHelper job.getConfiguration().set("io.sort.record.percent", "0.19"); injectSystemProperties(job); + config.addJobProperties(job); config.addInputPaths(job); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java new file mode 100644 index 00000000000..8886c6e8a3c --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -0,0 +1,164 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + */ +public class JobHelperTest +{ + + public final + @Rule + TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private HadoopDruidIndexerConfig config; + private File tmpDir; + private File dataFile; + private Interval interval = new Interval("2014-10-22T00:00:00Z/P1D"); + + @Before + public void setup() throws Exception + { + tmpDir = temporaryFolder.newFile(); + dataFile = temporaryFolder.newFile(); + config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + "website", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited_num") + ) + ), + new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")}, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) + ) + ), + new HadoopIOConfig( + ImmutableMap.of( + "paths", + dataFile.getCanonicalPath(), + "type", + "static" + ), + null, + tmpDir.getCanonicalPath() + ), + new HadoopTuningConfig( + tmpDir.getCanonicalPath(), + null, + null, + null, + null, + null, + false, + false, + false, + false, + //Map of job properties + ImmutableMap.of( + "fs.s3.impl", + "org.apache.hadoop.fs.s3native.NativeS3FileSystem", + "fs.s3.awsAccessKeyId", + "THISISMYACCESSKEY" + ), + false, + false, + false, + null, + null + ) + ) + ); + } + + @Test + public void testEnsurePathsAddsProperties() throws Exception + { + HadoopDruidIndexerConfigSpy hadoopDruidIndexerConfigSpy = new HadoopDruidIndexerConfigSpy(config); + JobHelper.ensurePaths(hadoopDruidIndexerConfigSpy); + Map jobProperties = hadoopDruidIndexerConfigSpy.getJobProperties(); + Assert.assertEquals( + "fs.s3.impl property set correctly", + "org.apache.hadoop.fs.s3native.NativeS3FileSystem", + jobProperties.get("fs.s3.impl") + ); + Assert.assertEquals( + "fs.s3.accessKeyId property set correctly", + "THISISMYACCESSKEY", + jobProperties.get("fs.s3.awsAccessKeyId") + ); + } + + + + private static class HadoopDruidIndexerConfigSpy extends HadoopDruidIndexerConfig + { + + private HadoopDruidIndexerConfig delegate; + private Map jobProperties = new HashMap(); + + public HadoopDruidIndexerConfigSpy(HadoopDruidIndexerConfig delegate) + { + super(delegate.getSchema()); + this.delegate = delegate; + } + + @Override + public Job addInputPaths(Job job) throws IOException + { + Configuration configuration = job.getConfiguration(); + for (Map.Entry en : configuration) { + jobProperties.put(en.getKey(), en.getValue()); + } + return job; + } + + public Map getJobProperties() { return jobProperties; } + } +}