diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 25672e85cad..6aeaa6430ab 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -316,6 +316,14 @@ public class HadoopDruidIndexerConfig return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum); } + /** + * Job instance should have Configuration set (by calling {@link #addJobProperties(Job)} + * or via injected system properties) before this method is called. The {@link PathSpec} may + * create objects which depend on the values of these configurations. + * @param job + * @return + * @throws IOException + */ public Job addInputPaths(Job job) throws IOException { return pathSpec.addInputPaths(this, job); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 6f87db20235..f69af4aa27e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -158,8 +158,8 @@ public class IndexGeneratorJob implements Jobby job.setOutputFormatClass(IndexGeneratorOutputFormat.class); FileOutputFormat.setOutputPath(job, config.makeIntermediatePath()); - config.addInputPaths(job); config.addJobProperties(job); + config.addInputPaths(job); // hack to get druid.processing.bitmap property passed down to hadoop job. // once IndexIO doesn't rely on globally injected properties, we can move this into the HadoopTuningConfig. diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 19c743b8240..d9a3de92ad6 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -163,6 +163,7 @@ public class JobHelper job.getConfiguration().set("io.sort.record.percent", "0.19"); injectSystemProperties(job); + config.addJobProperties(job); config.addInputPaths(job); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java new file mode 100644 index 00000000000..8886c6e8a3c --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -0,0 +1,164 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + */ +public class JobHelperTest +{ + + public final + @Rule + TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private HadoopDruidIndexerConfig config; + private File tmpDir; + private File dataFile; + private Interval interval = new Interval("2014-10-22T00:00:00Z/P1D"); + + @Before + public void setup() throws Exception + { + tmpDir = temporaryFolder.newFile(); + dataFile = temporaryFolder.newFile(); + config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + "website", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited_num") + ) + ), + new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")}, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) + ) + ), + new HadoopIOConfig( + ImmutableMap.of( + "paths", + dataFile.getCanonicalPath(), + "type", + "static" + ), + null, + tmpDir.getCanonicalPath() + ), + new HadoopTuningConfig( + tmpDir.getCanonicalPath(), + null, + null, + null, + null, + null, + false, + false, + false, + false, + //Map of job properties + ImmutableMap.of( + "fs.s3.impl", + "org.apache.hadoop.fs.s3native.NativeS3FileSystem", + "fs.s3.awsAccessKeyId", + "THISISMYACCESSKEY" + ), + false, + false, + false, + null, + null + ) + ) + ); + } + + @Test + public void testEnsurePathsAddsProperties() throws Exception + { + HadoopDruidIndexerConfigSpy hadoopDruidIndexerConfigSpy = new HadoopDruidIndexerConfigSpy(config); + JobHelper.ensurePaths(hadoopDruidIndexerConfigSpy); + Map jobProperties = hadoopDruidIndexerConfigSpy.getJobProperties(); + Assert.assertEquals( + "fs.s3.impl property set correctly", + "org.apache.hadoop.fs.s3native.NativeS3FileSystem", + jobProperties.get("fs.s3.impl") + ); + Assert.assertEquals( + "fs.s3.accessKeyId property set correctly", + "THISISMYACCESSKEY", + jobProperties.get("fs.s3.awsAccessKeyId") + ); + } + + + + private static class HadoopDruidIndexerConfigSpy extends HadoopDruidIndexerConfig + { + + private HadoopDruidIndexerConfig delegate; + private Map jobProperties = new HashMap(); + + public HadoopDruidIndexerConfigSpy(HadoopDruidIndexerConfig delegate) + { + super(delegate.getSchema()); + this.delegate = delegate; + } + + @Override + public Job addInputPaths(Job job) throws IOException + { + Configuration configuration = job.getConfiguration(); + for (Map.Entry en : configuration) { + jobProperties.put(en.getKey(), en.getValue()); + } + return job; + } + + public Map getJobProperties() { return jobProperties; } + } +}