Merge pull request #1484 from tubemogul/feature/1463

JobHelper.ensurePaths will set job properties from config (tuningConf…
This commit is contained in:
Charles Allen 2015-07-07 10:58:16 -07:00
commit b2bc46be17
4 changed files with 174 additions and 1 deletions

View File

@ -341,6 +341,14 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
}
/**
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may
* create objects which depend on the values of these configurations.
* @param job
* @return
* @throws IOException
*/
public Job addInputPaths(Job job) throws IOException
{
return pathSpec.addInputPaths(this, job);

View File

@ -158,8 +158,8 @@ public class IndexGeneratorJob implements Jobby
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
config.addInputPaths(job);
config.addJobProperties(job);
config.addInputPaths(job);
// hack to get druid.processing.bitmap property passed down to hadoop job.
// once IndexIO doesn't rely on globally injected properties, we can move this into the HadoopTuningConfig.

View File

@ -163,6 +163,7 @@ public class JobHelper
job.getConfiguration().set("io.sort.record.percent", "0.19");
injectSystemProperties(job);
config.addJobProperties(job);
config.addInputPaths(job);
}

View File

@ -0,0 +1,164 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class JobHelperTest
{
public final
@Rule
TemporaryFolder temporaryFolder = new TemporaryFolder();
private HadoopDruidIndexerConfig config;
private File tmpDir;
private File dataFile;
private Interval interval = new Interval("2014-10-22T00:00:00Z/P1D");
@Before
public void setup() throws Exception
{
tmpDir = temporaryFolder.newFile();
dataFile = temporaryFolder.newFile();
config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
dataFile.getCanonicalPath(),
"type",
"static"
),
null,
tmpDir.getCanonicalPath()
),
new HadoopTuningConfig(
tmpDir.getCanonicalPath(),
null,
null,
null,
null,
null,
false,
false,
false,
false,
//Map of job properties
ImmutableMap.of(
"fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem",
"fs.s3.awsAccessKeyId",
"THISISMYACCESSKEY"
),
false,
false,
false,
null,
null
)
)
);
}
@Test
public void testEnsurePathsAddsProperties() throws Exception
{
HadoopDruidIndexerConfigSpy hadoopDruidIndexerConfigSpy = new HadoopDruidIndexerConfigSpy(config);
JobHelper.ensurePaths(hadoopDruidIndexerConfigSpy);
Map<String, String> jobProperties = hadoopDruidIndexerConfigSpy.getJobProperties();
Assert.assertEquals(
"fs.s3.impl property set correctly",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem",
jobProperties.get("fs.s3.impl")
);
Assert.assertEquals(
"fs.s3.accessKeyId property set correctly",
"THISISMYACCESSKEY",
jobProperties.get("fs.s3.awsAccessKeyId")
);
}
private static class HadoopDruidIndexerConfigSpy extends HadoopDruidIndexerConfig
{
private HadoopDruidIndexerConfig delegate;
private Map<String, String> jobProperties = new HashMap<String, String>();
public HadoopDruidIndexerConfigSpy(HadoopDruidIndexerConfig delegate)
{
super(delegate.getSchema());
this.delegate = delegate;
}
@Override
public Job addInputPaths(Job job) throws IOException
{
Configuration configuration = job.getConfiguration();
for (Map.Entry<String, String> en : configuration) {
jobProperties.put(en.getKey(), en.getValue());
}
return job;
}
public Map<String, String> getJobProperties() { return jobProperties; }
}
}