JobHelper.ensurePaths will set job properties from config (tuningConfig.jobProperties) before adding input paths to the config.

Adding input paths will create Path and FileSystem instances which may depend on the values in the job config.
This allows all properties to be set from the spec file, avoiding having to directly edit cluster xml files.

IndexGeneratorJob.run adds job properties before adding input paths (adding input paths may depend on having job properies set)

JobHelperTest confirms that JobHelper.ensurePaths adds job properties

javadoc for addInputPaths to explain relationship with
addJobProperties
This commit is contained in:
Michael Schiff 2015-06-26 00:57:22 -07:00
parent 2c463ae435
commit 6ad451a44a
4 changed files with 174 additions and 1 deletions

View File

@ -316,6 +316,14 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
}
/**
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may
* create objects which depend on the values of these configurations.
* @param job
* @return
* @throws IOException
*/
public Job addInputPaths(Job job) throws IOException
{
return pathSpec.addInputPaths(this, job);

View File

@ -158,8 +158,8 @@ public class IndexGeneratorJob implements Jobby
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
config.addInputPaths(job);
config.addJobProperties(job);
config.addInputPaths(job);
// hack to get druid.processing.bitmap property passed down to hadoop job.
// once IndexIO doesn't rely on globally injected properties, we can move this into the HadoopTuningConfig.

View File

@ -163,6 +163,7 @@ public class JobHelper
job.getConfiguration().set("io.sort.record.percent", "0.19");
injectSystemProperties(job);
config.addJobProperties(job);
config.addInputPaths(job);
}

View File

@ -0,0 +1,164 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class JobHelperTest
{
public final
@Rule
TemporaryFolder temporaryFolder = new TemporaryFolder();
private HadoopDruidIndexerConfig config;
private File tmpDir;
private File dataFile;
private Interval interval = new Interval("2014-10-22T00:00:00Z/P1D");
@Before
public void setup() throws Exception
{
tmpDir = temporaryFolder.newFile();
dataFile = temporaryFolder.newFile();
config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
dataFile.getCanonicalPath(),
"type",
"static"
),
null,
tmpDir.getCanonicalPath()
),
new HadoopTuningConfig(
tmpDir.getCanonicalPath(),
null,
null,
null,
null,
null,
false,
false,
false,
false,
//Map of job properties
ImmutableMap.of(
"fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem",
"fs.s3.awsAccessKeyId",
"THISISMYACCESSKEY"
),
false,
false,
false,
null,
null
)
)
);
}
@Test
public void testEnsurePathsAddsProperties() throws Exception
{
HadoopDruidIndexerConfigSpy hadoopDruidIndexerConfigSpy = new HadoopDruidIndexerConfigSpy(config);
JobHelper.ensurePaths(hadoopDruidIndexerConfigSpy);
Map<String, String> jobProperties = hadoopDruidIndexerConfigSpy.getJobProperties();
Assert.assertEquals(
"fs.s3.impl property set correctly",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem",
jobProperties.get("fs.s3.impl")
);
Assert.assertEquals(
"fs.s3.accessKeyId property set correctly",
"THISISMYACCESSKEY",
jobProperties.get("fs.s3.awsAccessKeyId")
);
}
private static class HadoopDruidIndexerConfigSpy extends HadoopDruidIndexerConfig
{
private HadoopDruidIndexerConfig delegate;
private Map<String, String> jobProperties = new HashMap<String, String>();
public HadoopDruidIndexerConfigSpy(HadoopDruidIndexerConfig delegate)
{
super(delegate.getSchema());
this.delegate = delegate;
}
@Override
public Job addInputPaths(Job job) throws IOException
{
Configuration configuration = job.getConfiguration();
for (Map.Entry<String, String> en : configuration) {
jobProperties.put(en.getKey(), en.getValue());
}
return job;
}
public Map<String, String> getJobProperties() { return jobProperties; }
}
}