Merge pull request #581 from metamx/combine-text

Make combine text input format configurable
This commit is contained in:
Gian Merlino 2014-06-04 16:40:53 -07:00
commit ceece7ce36
5 changed files with 37 additions and 9 deletions

View File

@ -33,8 +33,8 @@ import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configurable;
@ -48,8 +48,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.joda.time.DateTime;
@ -92,7 +92,11 @@ public class DetermineHashedPartitionsJob implements Jobby
);
JobHelper.injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(CombineTextInputFormat.class);
if (config.isCombineText()) {
groupByJob.setInputFormatClass(CombineTextInputFormat.class);
} else {
groupByJob.setInputFormatClass(TextInputFormat.class);
}
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);
groupByJob.setMapOutputValueClass(BytesWritable.class);

View File

@ -255,6 +255,11 @@ public class HadoopDruidIndexerConfig
return (schema.getIOConfig().getMetadataUpdateSpec() != null);
}
public boolean isCombineText()
{
return schema.getTuningConfig().isCombineText();
}
public StringInputRowParser getParser()
{
return (StringInputRowParser) schema.getDataSchema().getParser();

View File

@ -73,6 +73,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat,
@ -163,7 +164,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows,
jobProperties
jobProperties,
combineText
);
}
}
@ -212,6 +214,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
null,
false,
null,
false,
null,
null,
null,
@ -244,6 +247,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
null,
false,
null,
false,
null,
null,
null,
@ -276,6 +280,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
null,
false,
null,
false,
null,
null,
null,

View File

@ -52,7 +52,8 @@ public class HadoopTuningConfig implements TuningConfig
true,
false,
false,
null
null,
false
);
}
@ -66,6 +67,7 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean overwriteFiles;
private final boolean ignoreInvalidRows;
private final Map<String, String> jobProperties;
private final boolean combineText;
@JsonCreator
public HadoopTuningConfig(
@ -78,7 +80,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText
)
{
this.workingPath = workingPath == null ? null : workingPath;
@ -93,6 +96,7 @@ public class HadoopTuningConfig implements TuningConfig
this.jobProperties = (jobProperties == null
? ImmutableMap.<String, String>of()
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
}
@JsonProperty
@ -155,6 +159,12 @@ public class HadoopTuningConfig implements TuningConfig
return jobProperties;
}
@JsonProperty
public boolean isCombineText()
{
return combineText;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -167,7 +177,8 @@ public class HadoopTuningConfig implements TuningConfig
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows,
jobProperties
jobProperties,
combineText
);
}
@ -183,7 +194,8 @@ public class HadoopTuningConfig implements TuningConfig
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows,
jobProperties
jobProperties,
combineText
);
}
@ -199,7 +211,8 @@ public class HadoopTuningConfig implements TuningConfig
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows,
jobProperties
jobProperties,
combineText
);
}
}

View File

@ -422,6 +422,7 @@ public class TaskSerdeTest
null,
false,
ImmutableMap.of("foo", "bar"),
false,
null,
null,
null,