Merge pull request #1687 from himanshug/num_reducer_override

do not let user override hadoop job settings explicitly provided by druid code
This commit is contained in:
Fangjin Yang 2015-08-31 09:30:26 -07:00
commit 5e3a83871e
4 changed files with 7 additions and 5 deletions

View File

@ -88,6 +88,7 @@ public class DetermineHashedPartitionsJob implements Jobby
); );
JobHelper.injectSystemProperties(groupByJob); JobHelper.injectSystemProperties(groupByJob);
config.addJobProperties(groupByJob);
groupByJob.setMapperClass(DetermineCardinalityMapper.class); groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class); groupByJob.setMapOutputKeyClass(LongWritable.class);
groupByJob.setMapOutputValueClass(BytesWritable.class); groupByJob.setMapOutputValueClass(BytesWritable.class);
@ -104,7 +105,6 @@ public class DetermineHashedPartitionsJob implements Jobby
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob); JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
config.addInputPaths(groupByJob); config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
config.intoConfiguration(groupByJob); config.intoConfiguration(groupByJob);
FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir()); FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir());

View File

@ -126,6 +126,8 @@ public class DeterminePartitionsJob implements Jobby
); );
JobHelper.injectSystemProperties(groupByJob); JobHelper.injectSystemProperties(groupByJob);
config.addJobProperties(groupByJob);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class); groupByJob.setMapOutputKeyClass(BytesWritable.class);
groupByJob.setMapOutputValueClass(NullWritable.class); groupByJob.setMapOutputValueClass(NullWritable.class);
@ -137,7 +139,6 @@ public class DeterminePartitionsJob implements Jobby
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob); JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
config.addInputPaths(groupByJob); config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
config.intoConfiguration(groupByJob); config.intoConfiguration(groupByJob);
FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir()); FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir());
@ -163,6 +164,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19"); dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19");
JobHelper.injectSystemProperties(dimSelectionJob); JobHelper.injectSystemProperties(dimSelectionJob);
config.addJobProperties(dimSelectionJob);
if (!config.getPartitionsSpec().isAssumeGrouped()) { if (!config.getPartitionsSpec().isAssumeGrouped()) {
// Read grouped data from the groupByJob. // Read grouped data from the groupByJob.
@ -186,7 +188,6 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob); JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob);
config.addJobProperties(dimSelectionJob);
config.intoConfiguration(dimSelectionJob); config.intoConfiguration(dimSelectionJob);
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath()); FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());

View File

@ -148,6 +148,7 @@ public class IndexGeneratorJob implements Jobby
job.getConfiguration().set("io.sort.record.percent", "0.23"); job.getConfiguration().set("io.sort.record.percent", "0.23");
JobHelper.injectSystemProperties(job); JobHelper.injectSystemProperties(job);
config.addJobProperties(job);
job.setMapperClass(IndexGeneratorMapper.class); job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class); job.setMapOutputValueClass(BytesWritable.class);
@ -173,7 +174,6 @@ public class IndexGeneratorJob implements Jobby
job.setOutputFormatClass(IndexGeneratorOutputFormat.class); job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath()); FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
config.addJobProperties(job);
config.addInputPaths(job); config.addInputPaths(job);
// hack to get druid.processing.bitmap property passed down to hadoop job. // hack to get druid.processing.bitmap property passed down to hadoop job.

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator; import org.joda.time.DateTimeComparator;
@ -362,7 +363,7 @@ public class IndexGeneratorJobTest
false, false,
false, false,
false, false,
null, ImmutableMap.of(JobContext.NUM_REDUCES, "0"), //verifies that set num reducers is ignored
false, false,
false, false,
false, false,