From adc00f2bcfe2bfec5acb5239f87f519ec4194bb2 Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 4 Jun 2014 16:24:56 -0700 Subject: [PATCH 1/2] make combine text configurable --- .../indexer/DetermineHashedPartitionsJob.java | 10 +++++--- .../indexer/HadoopDruidIndexerConfig.java | 5 ++++ .../io/druid/indexer/HadoopIngestionSpec.java | 7 +++++- .../io/druid/indexer/HadoopTuningConfig.java | 23 +++++++++++++++---- 4 files changed, 36 insertions(+), 9 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 82643065123..4bc9cb9f35a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -33,8 +33,8 @@ import com.metamx.common.logger.Logger; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; -import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import org.apache.hadoop.conf.Configurable; @@ -48,8 +48,8 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.joda.time.DateTime; @@ -92,7 +92,11 @@ public class DetermineHashedPartitionsJob implements Jobby ); JobHelper.injectSystemProperties(groupByJob); - groupByJob.setInputFormatClass(CombineTextInputFormat.class); + if (config.isCombineText()) { + groupByJob.setInputFormatClass(CombineTextInputFormat.class); + } else { + groupByJob.setInputFormatClass(TextInputFormat.class); + } groupByJob.setMapperClass(DetermineCardinalityMapper.class); groupByJob.setMapOutputKeyClass(LongWritable.class); groupByJob.setMapOutputValueClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index f761e34be7f..0a863b78c13 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -255,6 +255,11 @@ public class HadoopDruidIndexerConfig return (schema.getIOConfig().getMetadataUpdateSpec() != null); } + public boolean isCombineText() + { + return schema.getTuningConfig().isCombineText(); + } + public StringInputRowParser getParser() { return (StringInputRowParser) schema.getDataSchema().getParser(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index 9c447ac53e8..b9947e81fe9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -73,6 +73,7 @@ public class HadoopIngestionSpec extends IngestionSpec jobProperties, + final @JsonProperty("combineText") boolean combineText, // These fields are deprecated and will be removed in the future final @JsonProperty("timestampColumn") String timestampColumn, final @JsonProperty("timestampFormat") String timestampFormat, @@ -163,7 +164,8 @@ public class HadoopIngestionSpec extends IngestionSpec jobProperties; + private final boolean combineText; @JsonCreator public HadoopTuningConfig( @@ -78,7 +80,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, final @JsonProperty("overwriteFiles") boolean overwriteFiles, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, - final @JsonProperty("jobProperties") Map jobProperties + final @JsonProperty("jobProperties") Map jobProperties, + final @JsonProperty("combineText") boolean combineText ) { this.workingPath = workingPath == null ? null : workingPath; @@ -93,6 +96,7 @@ public class HadoopTuningConfig implements TuningConfig this.jobProperties = (jobProperties == null ? ImmutableMap.of() : ImmutableMap.copyOf(jobProperties)); + this.combineText = combineText; } @JsonProperty @@ -155,6 +159,12 @@ public class HadoopTuningConfig implements TuningConfig return jobProperties; } + @JsonProperty + public boolean isCombineText() + { + return combineText; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -167,7 +177,8 @@ public class HadoopTuningConfig implements TuningConfig cleanupOnFailure, overwriteFiles, ignoreInvalidRows, - jobProperties + jobProperties, + combineText ); } @@ -183,7 +194,8 @@ public class HadoopTuningConfig implements TuningConfig cleanupOnFailure, overwriteFiles, ignoreInvalidRows, - jobProperties + jobProperties, + combineText ); } @@ -199,7 +211,8 @@ public class HadoopTuningConfig implements TuningConfig cleanupOnFailure, overwriteFiles, ignoreInvalidRows, - jobProperties + jobProperties, + combineText ); } } From 9f4cc5ca1fd3834d04fc47fd00d452d81d6c176c Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 4 Jun 2014 16:29:20 -0700 Subject: [PATCH 2/2] fix test --- .../test/java/io/druid/indexing/common/task/TaskSerdeTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index f0feae9490a..00407381afc 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -422,6 +422,7 @@ public class TaskSerdeTest null, false, ImmutableMap.of("foo", "bar"), + false, null, null, null,