mirror of https://github.com/apache/druid.git
Merge pull request #56 from metamx/determine-partitions
Determine partitions better
This commit is contained in:
commit
ec034ddef4
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package com.metamx.druid.indexer.data;
|
package com.metamx.druid.indexer.data;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.metamx.common.exception.FormattedException;
|
import com.metamx.common.exception.FormattedException;
|
||||||
|
@ -56,7 +57,18 @@ public class StringInputRowParser
|
||||||
|
|
||||||
this.dimensionExclusions = Sets.newHashSet();
|
this.dimensionExclusions = Sets.newHashSet();
|
||||||
if (dimensionExclusions != null) {
|
if (dimensionExclusions != null) {
|
||||||
this.dimensionExclusions.addAll(dimensionExclusions);
|
this.dimensionExclusions.addAll(
|
||||||
|
Lists.transform(
|
||||||
|
dimensionExclusions, new Function<String, String>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String apply(String s)
|
||||||
|
{
|
||||||
|
return s.toLowerCase();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
this.dimensionExclusions.add(timestampSpec.getTimestampColumn());
|
this.dimensionExclusions.add(timestampSpec.getTimestampColumn());
|
||||||
|
|
||||||
|
|
|
@ -23,21 +23,23 @@ import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.ComparisonChain;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableSortedSet;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.PeekingIterator;
|
import com.google.common.collect.PeekingIterator;
|
||||||
import com.google.common.io.Closeables;
|
import com.google.common.io.Closeables;
|
||||||
import com.metamx.common.IAE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.Pair;
|
|
||||||
import com.metamx.common.guava.nary.BinaryFn;
|
import com.metamx.common.guava.nary.BinaryFn;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import com.metamx.common.parsers.Parser;
|
|
||||||
import com.metamx.common.parsers.ParserUtils;
|
|
||||||
import com.metamx.druid.CombiningIterable;
|
import com.metamx.druid.CombiningIterable;
|
||||||
|
import com.metamx.druid.QueryGranularity;
|
||||||
|
import com.metamx.druid.input.InputRow;
|
||||||
import com.metamx.druid.shard.NoneShardSpec;
|
import com.metamx.druid.shard.NoneShardSpec;
|
||||||
import com.metamx.druid.shard.ShardSpec;
|
import com.metamx.druid.shard.ShardSpec;
|
||||||
import com.metamx.druid.shard.SingleDimensionShardSpec;
|
import com.metamx.druid.shard.SingleDimensionShardSpec;
|
||||||
|
@ -45,7 +47,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.BytesWritable;
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||||
|
@ -56,8 +58,11 @@ import org.apache.hadoop.mapreduce.RecordWriter;
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
|
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||||
import org.codehaus.jackson.type.TypeReference;
|
import org.codehaus.jackson.type.TypeReference;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.DateTimeComparator;
|
import org.joda.time.DateTimeComparator;
|
||||||
|
@ -65,20 +70,26 @@ import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
|
||||||
|
* choosing the highest cardinality dimension that satisfies the criteria:
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li>Must have exactly one value per row.</li>
|
||||||
|
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
|
||||||
|
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
|
||||||
|
* </ul>
|
||||||
*/
|
*/
|
||||||
public class DeterminePartitionsJob implements Jobby
|
public class DeterminePartitionsJob implements Jobby
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(DeterminePartitionsJob.class);
|
private static final Logger log = new Logger(DeterminePartitionsJob.class);
|
||||||
|
|
||||||
private static final Joiner keyJoiner = Joiner.on(",");
|
|
||||||
private static final Splitter keySplitter = Splitter.on(",");
|
|
||||||
private static final Joiner tabJoiner = HadoopDruidIndexerConfig.tabJoiner;
|
private static final Joiner tabJoiner = HadoopDruidIndexerConfig.tabJoiner;
|
||||||
private static final Splitter tabSplitter = HadoopDruidIndexerConfig.tabSplitter;
|
private static final Splitter tabSplitter = HadoopDruidIndexerConfig.tabSplitter;
|
||||||
|
|
||||||
|
@ -91,146 +102,314 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
this.config = config;
|
this.config = config;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean run()
|
public static void injectSystemProperties(Job job)
|
||||||
{
|
{
|
||||||
try {
|
final Configuration conf = job.getConfiguration();
|
||||||
Job job = new Job(
|
for (String propName : System.getProperties().stringPropertyNames()) {
|
||||||
new Configuration(),
|
if (propName.startsWith("hadoop.")) {
|
||||||
String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals())
|
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
|
||||||
);
|
|
||||||
|
|
||||||
job.getConfiguration().set("io.sort.record.percent", "0.19");
|
|
||||||
for (String propName : System.getProperties().stringPropertyNames()) {
|
|
||||||
Configuration conf = job.getConfiguration();
|
|
||||||
if (propName.startsWith("hadoop.")) {
|
|
||||||
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
job.setInputFormatClass(TextInputFormat.class);
|
|
||||||
|
|
||||||
job.setMapperClass(DeterminePartitionsMapper.class);
|
|
||||||
job.setMapOutputValueClass(Text.class);
|
|
||||||
|
|
||||||
SortableBytes.useSortableBytesAsKey(job);
|
|
||||||
|
|
||||||
job.setCombinerClass(DeterminePartitionsCombiner.class);
|
|
||||||
job.setReducerClass(DeterminePartitionsReducer.class);
|
|
||||||
job.setOutputKeyClass(BytesWritable.class);
|
|
||||||
job.setOutputValueClass(Text.class);
|
|
||||||
job.setOutputFormatClass(DeterminePartitionsJob.DeterminePartitionsOutputFormat.class);
|
|
||||||
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
|
|
||||||
|
|
||||||
config.addInputPaths(job);
|
|
||||||
config.intoConfiguration(job);
|
|
||||||
|
|
||||||
job.setJarByClass(DeterminePartitionsJob.class);
|
|
||||||
|
|
||||||
job.submit();
|
|
||||||
log.info("Job submitted, status available at %s", job.getTrackingURL());
|
|
||||||
|
|
||||||
final boolean retVal = job.waitForCompletion(true);
|
|
||||||
|
|
||||||
if (retVal) {
|
|
||||||
log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
|
|
||||||
FileSystem fileSystem = null;
|
|
||||||
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
|
|
||||||
int shardCount = 0;
|
|
||||||
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
|
|
||||||
DateTime bucket = segmentGranularity.getStart();
|
|
||||||
|
|
||||||
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0));
|
|
||||||
if (fileSystem == null) {
|
|
||||||
fileSystem = partitionInfoPath.getFileSystem(job.getConfiguration());
|
|
||||||
}
|
|
||||||
if (fileSystem.exists(partitionInfoPath)) {
|
|
||||||
List<ShardSpec> specs = config.jsonMapper.readValue(
|
|
||||||
Utils.openInputStream(job, partitionInfoPath), new TypeReference<List<ShardSpec>>()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
|
|
||||||
for (int i = 0; i < specs.size(); ++i) {
|
|
||||||
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
|
|
||||||
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
|
|
||||||
}
|
|
||||||
|
|
||||||
shardSpecs.put(bucket, actualSpecs);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
log.info("Path[%s] didn't exist!?", partitionInfoPath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
config.setShardSpecs(shardSpecs);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
log.info("Job completed unsuccessfully.");
|
|
||||||
}
|
|
||||||
|
|
||||||
return retVal;
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DeterminePartitionsMapper extends Mapper<LongWritable, Text, BytesWritable, Text>
|
public boolean run()
|
||||||
{
|
{
|
||||||
private HadoopDruidIndexerConfig config;
|
try {
|
||||||
private String partitionDimension;
|
/*
|
||||||
private Parser parser;
|
* Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear
|
||||||
private Function<String, DateTime> timestampConverter;
|
* in the final segment.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if(!config.getPartitionsSpec().isAssumeGrouped()) {
|
||||||
|
final Job groupByJob = new Job(
|
||||||
|
new Configuration(),
|
||||||
|
String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
|
||||||
|
);
|
||||||
|
|
||||||
|
injectSystemProperties(groupByJob);
|
||||||
|
groupByJob.setInputFormatClass(TextInputFormat.class);
|
||||||
|
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
|
||||||
|
groupByJob.setMapOutputKeyClass(BytesWritable.class);
|
||||||
|
groupByJob.setMapOutputValueClass(NullWritable.class);
|
||||||
|
groupByJob.setCombinerClass(DeterminePartitionsGroupByReducer.class);
|
||||||
|
groupByJob.setReducerClass(DeterminePartitionsGroupByReducer.class);
|
||||||
|
groupByJob.setOutputKeyClass(BytesWritable.class);
|
||||||
|
groupByJob.setOutputValueClass(NullWritable.class);
|
||||||
|
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
|
||||||
|
groupByJob.setJarByClass(DeterminePartitionsJob.class);
|
||||||
|
|
||||||
|
config.addInputPaths(groupByJob);
|
||||||
|
config.intoConfiguration(groupByJob);
|
||||||
|
FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir());
|
||||||
|
|
||||||
|
groupByJob.submit();
|
||||||
|
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
|
||||||
|
|
||||||
|
if(!groupByJob.waitForCompletion(true)) {
|
||||||
|
log.error("Job failed: %s", groupByJob.getJobID().toString());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.info("Skipping group-by job.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Read grouped data and determine appropriate partitions.
|
||||||
|
*/
|
||||||
|
final Job dimSelectionJob = new Job(
|
||||||
|
new Configuration(),
|
||||||
|
String.format("%s-determine_partitions_dimselection-%s", config.getDataSource(), config.getIntervals())
|
||||||
|
);
|
||||||
|
|
||||||
|
dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19");
|
||||||
|
|
||||||
|
injectSystemProperties(dimSelectionJob);
|
||||||
|
|
||||||
|
if(!config.getPartitionsSpec().isAssumeGrouped()) {
|
||||||
|
// Read grouped data from the groupByJob.
|
||||||
|
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionPostGroupByMapper.class);
|
||||||
|
dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class);
|
||||||
|
FileInputFormat.addInputPath(dimSelectionJob, config.makeGroupedDataDir());
|
||||||
|
} else {
|
||||||
|
// Directly read the source data, since we assume it's already grouped.
|
||||||
|
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class);
|
||||||
|
dimSelectionJob.setInputFormatClass(TextInputFormat.class);
|
||||||
|
config.addInputPaths(dimSelectionJob);
|
||||||
|
}
|
||||||
|
|
||||||
|
SortableBytes.useSortableBytesAsMapOutputKey(dimSelectionJob);
|
||||||
|
dimSelectionJob.setMapOutputValueClass(Text.class);
|
||||||
|
dimSelectionJob.setCombinerClass(DeterminePartitionsDimSelectionCombiner.class);
|
||||||
|
dimSelectionJob.setReducerClass(DeterminePartitionsDimSelectionReducer.class);
|
||||||
|
dimSelectionJob.setOutputKeyClass(BytesWritable.class);
|
||||||
|
dimSelectionJob.setOutputValueClass(Text.class);
|
||||||
|
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
|
||||||
|
dimSelectionJob.setJarByClass(DeterminePartitionsJob.class);
|
||||||
|
|
||||||
|
config.intoConfiguration(dimSelectionJob);
|
||||||
|
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());
|
||||||
|
|
||||||
|
dimSelectionJob.submit();
|
||||||
|
log.info(
|
||||||
|
"Job %s submitted, status available at: %s",
|
||||||
|
dimSelectionJob.getJobName(),
|
||||||
|
dimSelectionJob.getTrackingURL()
|
||||||
|
);
|
||||||
|
|
||||||
|
if(!dimSelectionJob.waitForCompletion(true)) {
|
||||||
|
log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Load partitions determined by the previous job.
|
||||||
|
*/
|
||||||
|
|
||||||
|
log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
|
||||||
|
FileSystem fileSystem = null;
|
||||||
|
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
|
||||||
|
int shardCount = 0;
|
||||||
|
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
|
||||||
|
DateTime bucket = segmentGranularity.getStart();
|
||||||
|
|
||||||
|
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0));
|
||||||
|
if (fileSystem == null) {
|
||||||
|
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
|
||||||
|
}
|
||||||
|
if (fileSystem.exists(partitionInfoPath)) {
|
||||||
|
List<ShardSpec> specs = config.jsonMapper.readValue(
|
||||||
|
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
|
||||||
|
for (int i = 0; i < specs.size(); ++i) {
|
||||||
|
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
|
||||||
|
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
shardSpecs.put(bucket, actualSpecs);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
log.info("Path[%s] didn't exist!?", partitionInfoPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
config.setShardSpecs(shardSpecs);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch(Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class DeterminePartitionsGroupByMapper extends HadoopDruidIndexerMapper<BytesWritable, NullWritable>
|
||||||
|
{
|
||||||
|
private QueryGranularity rollupGranularity = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setup(Context context)
|
protected void setup(Context context)
|
||||||
throws IOException, InterruptedException
|
throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
super.setup(context);
|
||||||
partitionDimension = config.getPartitionDimension();
|
rollupGranularity = getConfig().getRollupSpec().getRollupGranularity();
|
||||||
parser = config.getDataSpec().getParser();
|
}
|
||||||
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
|
|
||||||
|
@Override
|
||||||
|
protected void innerMap(
|
||||||
|
InputRow inputRow,
|
||||||
|
Text text,
|
||||||
|
Context context
|
||||||
|
) throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
// Create group key
|
||||||
|
// TODO -- There are more efficient ways to do this
|
||||||
|
final Map<String, Set<String>> dims = Maps.newTreeMap();
|
||||||
|
for(final String dim : inputRow.getDimensions()) {
|
||||||
|
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
|
||||||
|
if(dimValues.size() > 0) {
|
||||||
|
dims.put(dim, dimValues);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final List<Object> groupKey = ImmutableList.of(
|
||||||
|
rollupGranularity.truncate(inputRow.getTimestampFromEpoch()),
|
||||||
|
dims
|
||||||
|
);
|
||||||
|
context.write(
|
||||||
|
new BytesWritable(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)),
|
||||||
|
NullWritable.get()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class DeterminePartitionsGroupByReducer
|
||||||
|
extends Reducer<BytesWritable, NullWritable, BytesWritable, NullWritable>
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void reduce(
|
||||||
|
BytesWritable key,
|
||||||
|
Iterable<NullWritable> values,
|
||||||
|
Context context
|
||||||
|
) throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
context.write(key, NullWritable.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This DimSelection mapper runs on data generated by our GroupBy job.
|
||||||
|
*/
|
||||||
|
public static class DeterminePartitionsDimSelectionPostGroupByMapper
|
||||||
|
extends Mapper<BytesWritable, NullWritable, BytesWritable, Text>
|
||||||
|
{
|
||||||
|
private DeterminePartitionsDimSelectionMapperHelper helper;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setup(Context context)
|
||||||
|
throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||||
|
final String partitionDimension = config.getPartitionDimension();
|
||||||
|
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void map(
|
protected void map(
|
||||||
LongWritable key, Text value, Context context
|
BytesWritable key, NullWritable value, Context context
|
||||||
) throws IOException, InterruptedException
|
) throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
Map<String, Object> values = parser.parse(value.toString());
|
final List<Object> timeAndDims = HadoopDruidIndexerConfig.jsonMapper.readValue(key.getBytes(), List.class);
|
||||||
final DateTime timestamp;
|
|
||||||
final String tsStr = (String) values.get(config.getTimestampColumnName());
|
|
||||||
try {
|
|
||||||
timestamp = timestampConverter.apply(tsStr);
|
|
||||||
}
|
|
||||||
catch(IllegalArgumentException e) {
|
|
||||||
if(config.isIgnoreInvalidRows()) {
|
|
||||||
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
|
|
||||||
return; // we're ignoring this invalid row
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
final DateTime timestamp = new DateTime(timeAndDims.get(0));
|
||||||
|
final Map<String, Iterable<String>> dims = (Map<String, Iterable<String>>) timeAndDims.get(1);
|
||||||
|
|
||||||
|
helper.emitDimValueCounts(context, timestamp, dims);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This DimSelection mapper runs on raw input data that we assume has already been grouped.
|
||||||
|
*/
|
||||||
|
public static class DeterminePartitionsDimSelectionAssumeGroupedMapper
|
||||||
|
extends HadoopDruidIndexerMapper<BytesWritable, Text>
|
||||||
|
{
|
||||||
|
private DeterminePartitionsDimSelectionMapperHelper helper;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setup(Context context)
|
||||||
|
throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
super.setup(context);
|
||||||
|
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||||
|
final String partitionDimension = config.getPartitionDimension();
|
||||||
|
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void innerMap(
|
||||||
|
InputRow inputRow,
|
||||||
|
Text text,
|
||||||
|
Context context
|
||||||
|
) throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
final Map<String, Iterable<String>> dims = Maps.newHashMap();
|
||||||
|
for(final String dim : inputRow.getDimensions()) {
|
||||||
|
dims.put(dim, inputRow.getDimension(dim));
|
||||||
|
}
|
||||||
|
helper.emitDimValueCounts(context, new DateTime(inputRow.getTimestampFromEpoch()), dims);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Since we have two slightly different DimSelectionMappers, this class encapsulates the shared logic for
|
||||||
|
* emitting dimension value counts.
|
||||||
|
*/
|
||||||
|
public static class DeterminePartitionsDimSelectionMapperHelper
|
||||||
|
{
|
||||||
|
private final HadoopDruidIndexerConfig config;
|
||||||
|
private final String partitionDimension;
|
||||||
|
|
||||||
|
public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
|
||||||
|
{
|
||||||
|
this.config = config;
|
||||||
|
this.partitionDimension = partitionDimension;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void emitDimValueCounts(
|
||||||
|
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
|
||||||
|
DateTime timestamp,
|
||||||
|
Map<String, Iterable<String>> dims
|
||||||
|
) throws IOException, InterruptedException
|
||||||
|
{
|
||||||
final Optional<Interval> maybeInterval = config.getGranularitySpec().bucketInterval(timestamp);
|
final Optional<Interval> maybeInterval = config.getGranularitySpec().bucketInterval(timestamp);
|
||||||
if(maybeInterval.isPresent()) {
|
|
||||||
final DateTime bucket = maybeInterval.get().getStart();
|
|
||||||
final String outKey = keyJoiner.join(bucket.toString(), partitionDimension);
|
|
||||||
|
|
||||||
final Object dimValue = values.get(partitionDimension);
|
if(!maybeInterval.isPresent()) {
|
||||||
if (! (dimValue instanceof String)) {
|
throw new ISE("WTF?! No bucket found for timestamp: %s", timestamp);
|
||||||
throw new IAE("Cannot partition on a tag-style dimension[%s], line was[%s]", partitionDimension, value);
|
}
|
||||||
|
|
||||||
|
final Interval interval = maybeInterval.get();
|
||||||
|
final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8);
|
||||||
|
|
||||||
|
for(final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
|
||||||
|
final String dim = dimAndValues.getKey();
|
||||||
|
|
||||||
|
if(partitionDimension == null || partitionDimension.equals(dim)) {
|
||||||
|
final Iterable<String> dimValues = dimAndValues.getValue();
|
||||||
|
|
||||||
|
if(Iterables.size(dimValues) == 1) {
|
||||||
|
// Emit this value.
|
||||||
|
write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
|
||||||
|
} else {
|
||||||
|
// This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
|
||||||
|
write(context, groupKey, new DimValueCount(dim, "", -1));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final byte[] groupKey = outKey.getBytes(Charsets.UTF_8);
|
|
||||||
write(context, groupKey, "", 1);
|
|
||||||
write(context, groupKey, (String) dimValue, 1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static abstract class DeterminePartitionsBaseReducer extends Reducer<BytesWritable, Text, BytesWritable, Text>
|
private static abstract class DeterminePartitionsDimSelectionBaseReducer
|
||||||
|
extends Reducer<BytesWritable, Text, BytesWritable, Text>
|
||||||
{
|
{
|
||||||
|
|
||||||
protected static volatile HadoopDruidIndexerConfig config = null;
|
protected static volatile HadoopDruidIndexerConfig config = null;
|
||||||
|
@ -240,7 +419,7 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
throws IOException, InterruptedException
|
throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
if (config == null) {
|
if (config == null) {
|
||||||
synchronized (DeterminePartitionsBaseReducer.class) {
|
synchronized (DeterminePartitionsDimSelectionBaseReducer.class) {
|
||||||
if (config == null) {
|
if (config == null) {
|
||||||
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||||
}
|
}
|
||||||
|
@ -255,166 +434,275 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
{
|
{
|
||||||
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
|
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
|
||||||
|
|
||||||
final Iterable<Pair<String, Long>> combinedIterable = combineRows(values);
|
final Iterable<DimValueCount> combinedIterable = combineRows(values);
|
||||||
innerReduce(context, keyBytes, combinedIterable);
|
innerReduce(context, keyBytes, combinedIterable);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void innerReduce(
|
protected abstract void innerReduce(
|
||||||
Context context, SortableBytes keyBytes, Iterable<Pair<String, Long>> combinedIterable
|
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
|
||||||
) throws IOException, InterruptedException;
|
) throws IOException, InterruptedException;
|
||||||
|
|
||||||
private Iterable<Pair<String, Long>> combineRows(Iterable<Text> input)
|
private Iterable<DimValueCount> combineRows(Iterable<Text> input)
|
||||||
{
|
{
|
||||||
return new CombiningIterable<Pair<String, Long>>(
|
return new CombiningIterable<DimValueCount>(
|
||||||
Iterables.transform(
|
Iterables.transform(
|
||||||
input,
|
input,
|
||||||
new Function<Text, Pair<String, Long>>()
|
new Function<Text, DimValueCount>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Pair<String, Long> apply(Text input)
|
public DimValueCount apply(Text input)
|
||||||
{
|
{
|
||||||
Iterator<String> splits = tabSplitter.split(input.toString()).iterator();
|
return DimValueCount.fromText(input);
|
||||||
return new Pair<String, Long>(splits.next(), Long.parseLong(splits.next()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
new Comparator<Pair<String, Long>>()
|
new Comparator<DimValueCount>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public int compare(Pair<String, Long> o1, Pair<String, Long> o2)
|
public int compare(DimValueCount o1, DimValueCount o2)
|
||||||
{
|
{
|
||||||
return o1.lhs.compareTo(o2.lhs);
|
return ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
new BinaryFn<Pair<String, Long>, Pair<String, Long>, Pair<String, Long>>()
|
new BinaryFn<DimValueCount, DimValueCount, DimValueCount>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Pair<String, Long> apply(Pair<String, Long> arg1, Pair<String, Long> arg2)
|
public DimValueCount apply(DimValueCount arg1, DimValueCount arg2)
|
||||||
{
|
{
|
||||||
if (arg2 == null) {
|
if (arg2 == null) {
|
||||||
return arg1;
|
return arg1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Pair<String, Long>(arg1.lhs, arg1.rhs + arg2.rhs);
|
// Respect "poisoning" (negative values mean we can't use this dimension)
|
||||||
|
final int newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
|
||||||
|
return new DimValueCount(arg1.dim, arg1.value, newNumRows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DeterminePartitionsCombiner extends DeterminePartitionsBaseReducer
|
public static class DeterminePartitionsDimSelectionCombiner extends DeterminePartitionsDimSelectionBaseReducer
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected void innerReduce(
|
protected void innerReduce(
|
||||||
Context context, SortableBytes keyBytes, Iterable<Pair<String, Long>> combinedIterable
|
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
|
||||||
) throws IOException, InterruptedException
|
) throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
for (Pair<String, Long> pair : combinedIterable) {
|
for (DimValueCount dvc : combinedIterable) {
|
||||||
write(context, keyBytes.getGroupKey(), pair.lhs, pair.rhs);
|
write(context, keyBytes.getGroupKey(), dvc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DeterminePartitionsReducer extends DeterminePartitionsBaseReducer
|
public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
|
||||||
{
|
{
|
||||||
String previousBoundary;
|
private static final double SHARD_COMBINE_THRESHOLD = 0.25;
|
||||||
long runningTotal;
|
private static final double SHARD_OVERSIZE_THRESHOLD = 1.5;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void innerReduce(
|
protected void innerReduce(
|
||||||
Context context, SortableBytes keyBytes, Iterable<Pair<String, Long>> combinedIterable
|
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
|
||||||
) throws IOException, InterruptedException
|
) throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
PeekingIterator<Pair<String, Long>> iterator = Iterators.peekingIterator(combinedIterable.iterator());
|
PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
|
||||||
Pair<String, Long> totalPair = iterator.next();
|
|
||||||
|
|
||||||
Preconditions.checkState(totalPair.lhs.equals(""), "Total pair value was[%s]!?", totalPair.lhs);
|
// "iterator" will take us over many candidate dimensions
|
||||||
long totalRows = totalPair.rhs;
|
DimPartitions currentDimPartitions = null;
|
||||||
|
DimPartition currentDimPartition = null;
|
||||||
|
String currentDimPartitionStart = null;
|
||||||
|
boolean currentDimSkip = false;
|
||||||
|
|
||||||
long numPartitions = Math.max(totalRows / config.getTargetPartitionSize(), 1);
|
// We'll store possible partitions in here
|
||||||
long expectedRowsPerPartition = totalRows / numPartitions;
|
final Map<String, DimPartitions> dimPartitionss = Maps.newHashMap();
|
||||||
|
|
||||||
class PartitionsList extends ArrayList<ShardSpec>
|
while(iterator.hasNext()) {
|
||||||
{
|
final DimValueCount dvc = iterator.next();
|
||||||
}
|
|
||||||
List<ShardSpec> partitions = new PartitionsList();
|
|
||||||
|
|
||||||
runningTotal = 0;
|
if(currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
|
||||||
Pair<String, Long> prev = null;
|
// Starting a new dimension! Exciting!
|
||||||
previousBoundary = null;
|
currentDimPartitions = new DimPartitions(dvc.dim);
|
||||||
while (iterator.hasNext()) {
|
currentDimPartition = new DimPartition();
|
||||||
Pair<String, Long> curr = iterator.next();
|
currentDimPartitionStart = null;
|
||||||
|
currentDimSkip = false;
|
||||||
if (runningTotal > expectedRowsPerPartition) {
|
|
||||||
Preconditions.checkNotNull(
|
|
||||||
prev, "Prev[null] while runningTotal[%s] was > expectedRows[%s]!?", runningTotal, expectedRowsPerPartition
|
|
||||||
);
|
|
||||||
|
|
||||||
addPartition(partitions, curr.lhs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
runningTotal += curr.rhs;
|
// Respect poisoning
|
||||||
prev = curr;
|
if(!currentDimSkip && dvc.numRows < 0) {
|
||||||
|
log.info("Cannot partition on multi-valued dimension: %s", dvc.dim);
|
||||||
|
currentDimSkip = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(currentDimSkip) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if we need to cut a new partition ending immediately before this dimension value
|
||||||
|
if(currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
|
||||||
|
final ShardSpec shardSpec = new SingleDimensionShardSpec(
|
||||||
|
currentDimPartitions.dim,
|
||||||
|
currentDimPartitionStart,
|
||||||
|
dvc.value,
|
||||||
|
currentDimPartitions.partitions.size()
|
||||||
|
);
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Adding possible shard with %,d rows and %,d unique values: %s",
|
||||||
|
currentDimPartition.rows,
|
||||||
|
currentDimPartition.cardinality,
|
||||||
|
shardSpec
|
||||||
|
);
|
||||||
|
|
||||||
|
currentDimPartition.shardSpec = shardSpec;
|
||||||
|
currentDimPartitions.partitions.add(currentDimPartition);
|
||||||
|
currentDimPartition = new DimPartition();
|
||||||
|
currentDimPartitionStart = dvc.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update counters
|
||||||
|
currentDimPartition.cardinality ++;
|
||||||
|
currentDimPartition.rows += dvc.numRows;
|
||||||
|
|
||||||
|
if(!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
|
||||||
|
// Finalize the current dimension
|
||||||
|
|
||||||
|
if(currentDimPartition.rows > 0) {
|
||||||
|
// One more shard to go
|
||||||
|
final ShardSpec shardSpec;
|
||||||
|
|
||||||
|
if (currentDimPartitions.partitions.isEmpty()) {
|
||||||
|
shardSpec = new NoneShardSpec();
|
||||||
|
} else {
|
||||||
|
if(currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) {
|
||||||
|
// Combine with previous shard
|
||||||
|
final DimPartition previousDimPartition = currentDimPartitions.partitions.remove(
|
||||||
|
currentDimPartitions.partitions.size() - 1
|
||||||
|
);
|
||||||
|
|
||||||
|
final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec;
|
||||||
|
|
||||||
|
shardSpec = new SingleDimensionShardSpec(
|
||||||
|
currentDimPartitions.dim,
|
||||||
|
previousShardSpec.getStart(),
|
||||||
|
null,
|
||||||
|
previousShardSpec.getPartitionNum()
|
||||||
|
);
|
||||||
|
|
||||||
|
log.info("Removing possible shard: %s", previousShardSpec);
|
||||||
|
|
||||||
|
currentDimPartition.rows += previousDimPartition.rows;
|
||||||
|
currentDimPartition.cardinality += previousDimPartition.cardinality;
|
||||||
|
} else {
|
||||||
|
// Create new shard
|
||||||
|
shardSpec = new SingleDimensionShardSpec(
|
||||||
|
currentDimPartitions.dim,
|
||||||
|
currentDimPartitionStart,
|
||||||
|
null,
|
||||||
|
currentDimPartitions.partitions.size()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Adding possible shard with %,d rows and %,d unique values: %s",
|
||||||
|
currentDimPartition.rows,
|
||||||
|
currentDimPartition.cardinality,
|
||||||
|
shardSpec
|
||||||
|
);
|
||||||
|
|
||||||
|
currentDimPartition.shardSpec = shardSpec;
|
||||||
|
currentDimPartitions.partitions.add(currentDimPartition);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Completed dimension[%s]: %,d possible shards with %,d unique values",
|
||||||
|
currentDimPartitions.dim,
|
||||||
|
currentDimPartitions.partitions.size(),
|
||||||
|
currentDimPartitions.getCardinality()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Add ourselves to the partitions map
|
||||||
|
dimPartitionss.put(currentDimPartitions.dim, currentDimPartitions);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitions.isEmpty()) {
|
// Choose best dimension
|
||||||
partitions.add(new NoneShardSpec());
|
if(dimPartitionss.isEmpty()) {
|
||||||
} else if (((double) runningTotal / (double) expectedRowsPerPartition) < 0.25) {
|
throw new ISE("No suitable partitioning dimension found!");
|
||||||
final SingleDimensionShardSpec lastSpec = (SingleDimensionShardSpec) partitions.remove(partitions.size() - 1);
|
|
||||||
partitions.add(
|
|
||||||
new SingleDimensionShardSpec(
|
|
||||||
config.getPartitionDimension(),
|
|
||||||
lastSpec.getStart(),
|
|
||||||
null,
|
|
||||||
lastSpec.getPartitionNum()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
partitions.add(
|
|
||||||
new SingleDimensionShardSpec(
|
|
||||||
config.getPartitionDimension(),
|
|
||||||
previousBoundary,
|
|
||||||
null,
|
|
||||||
partitions.size()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DateTime bucket = new DateTime(
|
final int totalRows = dimPartitionss.values().iterator().next().getRows();
|
||||||
Iterables.get(keySplitter.split(new String(keyBytes.getGroupKey(), Charsets.UTF_8)), 0)
|
|
||||||
);
|
int maxCardinality = -1;
|
||||||
OutputStream out = Utils.makePathAndOutputStream(
|
DimPartitions maxCardinalityPartitions = null;
|
||||||
|
|
||||||
|
for(final DimPartitions dimPartitions : dimPartitionss.values()) {
|
||||||
|
if(dimPartitions.getRows() != totalRows) {
|
||||||
|
throw new ISE(
|
||||||
|
"WTF?! Dimension[%s] row count %,d != expected row count %,d",
|
||||||
|
dimPartitions.dim,
|
||||||
|
dimPartitions.getRows(),
|
||||||
|
totalRows
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure none of these shards are oversized
|
||||||
|
boolean oversized = false;
|
||||||
|
for(final DimPartition partition : dimPartitions.partitions) {
|
||||||
|
if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) {
|
||||||
|
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
|
||||||
|
oversized = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(oversized) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(dimPartitions.getCardinality() > maxCardinality) {
|
||||||
|
maxCardinality = dimPartitions.getCardinality();
|
||||||
|
maxCardinalityPartitions = dimPartitions;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(maxCardinalityPartitions == null) {
|
||||||
|
throw new ISE("No suitable partitioning dimension found!");
|
||||||
|
}
|
||||||
|
|
||||||
|
final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
|
||||||
|
final OutputStream out = Utils.makePathAndOutputStream(
|
||||||
context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
|
context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
|
||||||
);
|
);
|
||||||
|
|
||||||
for (ShardSpec partition : partitions) {
|
final List<ShardSpec> chosenShardSpecs = Lists.transform(
|
||||||
log.info("%s", partition);
|
maxCardinalityPartitions.partitions, new Function<DimPartition, ShardSpec>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ShardSpec apply(DimPartition dimPartition)
|
||||||
|
{
|
||||||
|
return dimPartition.shardSpec;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
log.info("Chosen partitions:");
|
||||||
|
for (ShardSpec shardSpec : chosenShardSpecs) {
|
||||||
|
log.info(" %s", shardSpec);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
config.jsonMapper.writeValue(out, partitions);
|
HadoopDruidIndexerConfig.jsonMapper.writerWithType(new TypeReference<List<ShardSpec>>() {}).writeValue(
|
||||||
|
out,
|
||||||
|
chosenShardSpecs
|
||||||
|
);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
Closeables.close(out, false);
|
Closeables.close(out, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addPartition(List<ShardSpec> partitions, String boundary)
|
|
||||||
{
|
|
||||||
partitions.add(
|
|
||||||
new SingleDimensionShardSpec(
|
|
||||||
config.getPartitionDimension(),
|
|
||||||
previousBoundary,
|
|
||||||
boundary,
|
|
||||||
partitions.size()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
previousBoundary = boundary;
|
|
||||||
runningTotal = 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DeterminePartitionsOutputFormat extends FileOutputFormat
|
public static class DeterminePartitionsDimSelectionOutputFormat extends FileOutputFormat
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public RecordWriter getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException
|
public RecordWriter getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException
|
||||||
|
@ -444,17 +732,81 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class DimPartitions
|
||||||
|
{
|
||||||
|
public final String dim;
|
||||||
|
public final List<DimPartition> partitions = Lists.newArrayList();
|
||||||
|
|
||||||
|
private DimPartitions(String dim)
|
||||||
|
{
|
||||||
|
this.dim = dim;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCardinality()
|
||||||
|
{
|
||||||
|
int sum = 0;
|
||||||
|
for(final DimPartition dimPartition : partitions) {
|
||||||
|
sum += dimPartition.cardinality;
|
||||||
|
}
|
||||||
|
return sum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getRows()
|
||||||
|
{
|
||||||
|
int sum = 0;
|
||||||
|
for(final DimPartition dimPartition : partitions) {
|
||||||
|
sum += dimPartition.rows;
|
||||||
|
}
|
||||||
|
return sum;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DimPartition
|
||||||
|
{
|
||||||
|
public ShardSpec shardSpec = null;
|
||||||
|
public int cardinality = 0;
|
||||||
|
public int rows = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DimValueCount
|
||||||
|
{
|
||||||
|
public final String dim;
|
||||||
|
public final String value;
|
||||||
|
public final int numRows;
|
||||||
|
|
||||||
|
private DimValueCount(String dim, String value, int numRows)
|
||||||
|
{
|
||||||
|
this.dim = dim;
|
||||||
|
this.value = value;
|
||||||
|
this.numRows = numRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text toText()
|
||||||
|
{
|
||||||
|
return new Text(tabJoiner.join(dim, String.valueOf(numRows), value));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DimValueCount fromText(Text text)
|
||||||
|
{
|
||||||
|
final Iterator<String> splits = tabSplitter.limit(3).split(text.toString()).iterator();
|
||||||
|
final String dim = splits.next();
|
||||||
|
final int numRows = Integer.parseInt(splits.next());
|
||||||
|
final String value = splits.next();
|
||||||
|
|
||||||
|
return new DimValueCount(dim, value, numRows);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void write(
|
private static void write(
|
||||||
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
|
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
|
||||||
final byte[] groupKey,
|
final byte[] groupKey,
|
||||||
String value,
|
DimValueCount dimValueCount
|
||||||
long numRows
|
|
||||||
)
|
)
|
||||||
throws IOException, InterruptedException
|
throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
context.write(
|
context.write(
|
||||||
new SortableBytes(groupKey, value.getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(),
|
new SortableBytes(groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(),
|
||||||
new Text(tabJoiner.join(value, numRows))
|
dimValueCount.toText()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,15 +34,20 @@ import com.metamx.common.MapUtils;
|
||||||
import com.metamx.common.guava.FunctionalIterable;
|
import com.metamx.common.guava.FunctionalIterable;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import com.metamx.druid.RegisteringNode;
|
import com.metamx.druid.RegisteringNode;
|
||||||
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
import com.metamx.druid.index.v1.serde.Registererer;
|
import com.metamx.druid.index.v1.serde.Registererer;
|
||||||
import com.metamx.druid.indexer.data.DataSpec;
|
import com.metamx.druid.indexer.data.DataSpec;
|
||||||
|
import com.metamx.druid.indexer.data.StringInputRowParser;
|
||||||
|
import com.metamx.druid.indexer.data.TimestampSpec;
|
||||||
import com.metamx.druid.indexer.data.ToLowercaseDataSpec;
|
import com.metamx.druid.indexer.data.ToLowercaseDataSpec;
|
||||||
import com.metamx.druid.indexer.granularity.GranularitySpec;
|
import com.metamx.druid.indexer.granularity.GranularitySpec;
|
||||||
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
|
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
|
||||||
|
import com.metamx.druid.indexer.partitions.PartitionsSpec;
|
||||||
import com.metamx.druid.indexer.path.PathSpec;
|
import com.metamx.druid.indexer.path.PathSpec;
|
||||||
import com.metamx.druid.indexer.rollup.DataRollupSpec;
|
import com.metamx.druid.indexer.rollup.DataRollupSpec;
|
||||||
import com.metamx.druid.indexer.updater.UpdaterJobSpec;
|
import com.metamx.druid.indexer.updater.UpdaterJobSpec;
|
||||||
|
import com.metamx.druid.input.InputRow;
|
||||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||||
import com.metamx.druid.shard.ShardSpec;
|
import com.metamx.druid.shard.ShardSpec;
|
||||||
import com.metamx.druid.utils.JodaUtils;
|
import com.metamx.druid.utils.JodaUtils;
|
||||||
|
@ -50,6 +55,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.codehaus.jackson.JsonGenerator;
|
import org.codehaus.jackson.JsonGenerator;
|
||||||
|
import org.codehaus.jackson.annotate.JsonCreator;
|
||||||
import org.codehaus.jackson.annotate.JsonProperty;
|
import org.codehaus.jackson.annotate.JsonProperty;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.codehaus.jackson.type.TypeReference;
|
import org.codehaus.jackson.type.TypeReference;
|
||||||
|
@ -60,8 +66,6 @@ import org.joda.time.format.ISODateTimeFormat;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -162,8 +166,6 @@ public class HadoopDruidIndexerConfig
|
||||||
|
|
||||||
private static final String CONFIG_PROPERTY = "druid.indexer.config";
|
private static final String CONFIG_PROPERTY = "druid.indexer.config";
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private volatile List<Interval> intervals;
|
|
||||||
private volatile String dataSource;
|
private volatile String dataSource;
|
||||||
private volatile String timestampColumnName;
|
private volatile String timestampColumnName;
|
||||||
private volatile String timestampFormat;
|
private volatile String timestampFormat;
|
||||||
|
@ -175,8 +177,7 @@ public class HadoopDruidIndexerConfig
|
||||||
private volatile String jobOutputDir;
|
private volatile String jobOutputDir;
|
||||||
private volatile String segmentOutputDir;
|
private volatile String segmentOutputDir;
|
||||||
private volatile DateTime version = new DateTime();
|
private volatile DateTime version = new DateTime();
|
||||||
private volatile String partitionDimension;
|
private volatile PartitionsSpec partitionsSpec;
|
||||||
private volatile Long targetPartitionSize;
|
|
||||||
private volatile boolean leaveIntermediate = false;
|
private volatile boolean leaveIntermediate = false;
|
||||||
private volatile boolean cleanupOnFailure = true;
|
private volatile boolean cleanupOnFailure = true;
|
||||||
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs = ImmutableMap.of();
|
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs = ImmutableMap.of();
|
||||||
|
@ -186,22 +187,97 @@ public class HadoopDruidIndexerConfig
|
||||||
private volatile boolean ignoreInvalidRows = false;
|
private volatile boolean ignoreInvalidRows = false;
|
||||||
private volatile List<String> registererers = Lists.newArrayList();
|
private volatile List<String> registererers = Lists.newArrayList();
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public HadoopDruidIndexerConfig(
|
||||||
|
final @JsonProperty("intervals") List<Interval> intervals,
|
||||||
|
final @JsonProperty("dataSource") String dataSource,
|
||||||
|
final @JsonProperty("timestampColumnName") String timestampColumnName,
|
||||||
|
final @JsonProperty("timestampFormat") String timestampFormat,
|
||||||
|
final @JsonProperty("dataSpec") DataSpec dataSpec,
|
||||||
|
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
|
||||||
|
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
|
||||||
|
final @JsonProperty("pathSpec") PathSpec pathSpec,
|
||||||
|
final @JsonProperty("jobOutputDir") String jobOutputDir,
|
||||||
|
final @JsonProperty("segmentOutputDir") String segmentOutputDir,
|
||||||
|
final @JsonProperty("version") DateTime version,
|
||||||
|
final @JsonProperty("partitionDimension") String partitionDimension,
|
||||||
|
final @JsonProperty("targetPartitionSize") Long targetPartitionSize,
|
||||||
|
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
|
||||||
|
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
|
||||||
|
final @JsonProperty("cleanupOnFailure") boolean cleanupOnFailure,
|
||||||
|
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
|
||||||
|
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
||||||
|
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
|
||||||
|
final @JsonProperty("updaterJobSpec") UpdaterJobSpec updaterJobSpec,
|
||||||
|
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
||||||
|
final @JsonProperty("registererers") List<String> registererers
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.dataSource = dataSource;
|
||||||
|
this.timestampColumnName = timestampColumnName;
|
||||||
|
this.timestampFormat = timestampFormat;
|
||||||
|
this.dataSpec = dataSpec;
|
||||||
|
this.granularitySpec = granularitySpec;
|
||||||
|
this.pathSpec = pathSpec;
|
||||||
|
this.jobOutputDir = jobOutputDir;
|
||||||
|
this.segmentOutputDir = segmentOutputDir;
|
||||||
|
this.version = version;
|
||||||
|
this.partitionsSpec = partitionsSpec;
|
||||||
|
this.leaveIntermediate = leaveIntermediate;
|
||||||
|
this.cleanupOnFailure = cleanupOnFailure;
|
||||||
|
this.shardSpecs = shardSpecs;
|
||||||
|
this.overwriteFiles = overwriteFiles;
|
||||||
|
this.rollupSpec = rollupSpec;
|
||||||
|
this.updaterJobSpec = updaterJobSpec;
|
||||||
|
this.ignoreInvalidRows = ignoreInvalidRows;
|
||||||
|
this.registererers = registererers;
|
||||||
|
|
||||||
|
if(partitionsSpec != null) {
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
partitionDimension == null && targetPartitionSize == null,
|
||||||
|
"Cannot mix partitionsSpec with partitionDimension/targetPartitionSize"
|
||||||
|
);
|
||||||
|
|
||||||
|
this.partitionsSpec = partitionsSpec;
|
||||||
|
} else {
|
||||||
|
// Backwards compatibility
|
||||||
|
this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(granularitySpec != null) {
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
segmentGranularity == null && intervals == null,
|
||||||
|
"Cannot mix granularitySpec with segmentGranularity/intervals"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// Backwards compatibility
|
||||||
|
this.segmentGranularity = segmentGranularity;
|
||||||
|
if(segmentGranularity != null && intervals != null) {
|
||||||
|
this.granularitySpec = new UniformGranularitySpec(segmentGranularity, intervals);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default constructor does nothing. The caller is expected to use the various setX methods.
|
||||||
|
*/
|
||||||
|
public HadoopDruidIndexerConfig()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
public List<Interval> getIntervals()
|
public List<Interval> getIntervals()
|
||||||
{
|
{
|
||||||
return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
|
return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@JsonProperty
|
|
||||||
public void setIntervals(List<Interval> intervals)
|
public void setIntervals(List<Interval> intervals)
|
||||||
{
|
{
|
||||||
Preconditions.checkState(this.granularitySpec == null, "Use setGranularitySpec");
|
Preconditions.checkState(this.granularitySpec == null, "Cannot mix setIntervals with granularitySpec");
|
||||||
|
Preconditions.checkState(this.segmentGranularity != null, "Cannot use setIntervals without segmentGranularity");
|
||||||
|
|
||||||
// For backwards compatibility
|
// For backwards compatibility
|
||||||
this.intervals = intervals;
|
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, intervals);
|
||||||
if (this.segmentGranularity != null) {
|
|
||||||
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, this.intervals);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -237,6 +313,11 @@ public class HadoopDruidIndexerConfig
|
||||||
this.timestampFormat = timestampFormat;
|
this.timestampFormat = timestampFormat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TimestampSpec getTimestampSpec()
|
||||||
|
{
|
||||||
|
return new TimestampSpec(timestampColumnName, timestampFormat);
|
||||||
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public DataSpec getDataSpec()
|
public DataSpec getDataSpec()
|
||||||
{
|
{
|
||||||
|
@ -248,17 +329,30 @@ public class HadoopDruidIndexerConfig
|
||||||
this.dataSpec = new ToLowercaseDataSpec(dataSpec);
|
this.dataSpec = new ToLowercaseDataSpec(dataSpec);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
public StringInputRowParser getParser()
|
||||||
@JsonProperty
|
|
||||||
public void setSegmentGranularity(Granularity segmentGranularity)
|
|
||||||
{
|
{
|
||||||
Preconditions.checkState(this.granularitySpec == null, "Use setGranularitySpec");
|
final List<String> dimensionExclusions;
|
||||||
|
|
||||||
// For backwards compatibility
|
if(getDataSpec().hasCustomDimensions()) {
|
||||||
this.segmentGranularity = segmentGranularity;
|
dimensionExclusions = null;
|
||||||
if (this.intervals != null) {
|
} else {
|
||||||
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, this.intervals);
|
dimensionExclusions = Lists.newArrayList();
|
||||||
|
dimensionExclusions.add(getTimestampColumnName());
|
||||||
|
dimensionExclusions.addAll(
|
||||||
|
Lists.transform(
|
||||||
|
getRollupSpec().getAggs(), new Function<AggregatorFactory, String>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String apply(AggregatorFactory aggregatorFactory)
|
||||||
|
{
|
||||||
|
return aggregatorFactory.getName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions);
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -269,15 +363,20 @@ public class HadoopDruidIndexerConfig
|
||||||
|
|
||||||
public void setGranularitySpec(GranularitySpec granularitySpec)
|
public void setGranularitySpec(GranularitySpec granularitySpec)
|
||||||
{
|
{
|
||||||
Preconditions.checkState(this.intervals == null, "Use setGranularitySpec instead of setIntervals");
|
|
||||||
Preconditions.checkState(
|
|
||||||
this.segmentGranularity == null,
|
|
||||||
"Use setGranularitySpec instead of setSegmentGranularity"
|
|
||||||
);
|
|
||||||
|
|
||||||
this.granularitySpec = granularitySpec;
|
this.granularitySpec = granularitySpec;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public PartitionsSpec getPartitionsSpec()
|
||||||
|
{
|
||||||
|
return partitionsSpec;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPartitionsSpec(PartitionsSpec partitionsSpec)
|
||||||
|
{
|
||||||
|
this.partitionsSpec = partitionsSpec;
|
||||||
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public PathSpec getPathSpec()
|
public PathSpec getPathSpec()
|
||||||
{
|
{
|
||||||
|
@ -322,31 +421,19 @@ public class HadoopDruidIndexerConfig
|
||||||
this.version = version;
|
this.version = version;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
public String getPartitionDimension()
|
public String getPartitionDimension()
|
||||||
{
|
{
|
||||||
return partitionDimension;
|
return partitionsSpec.getPartitionDimension();
|
||||||
}
|
|
||||||
|
|
||||||
public void setPartitionDimension(String partitionDimension)
|
|
||||||
{
|
|
||||||
this.partitionDimension = (partitionDimension == null) ? partitionDimension : partitionDimension;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean partitionByDimension()
|
public boolean partitionByDimension()
|
||||||
{
|
{
|
||||||
return partitionDimension != null;
|
return partitionsSpec.isDeterminingPartitions();
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
public Long getTargetPartitionSize()
|
public Long getTargetPartitionSize()
|
||||||
{
|
{
|
||||||
return targetPartitionSize;
|
return partitionsSpec.getTargetPartitionSize();
|
||||||
}
|
|
||||||
|
|
||||||
public void setTargetPartitionSize(Long targetPartitionSize)
|
|
||||||
{
|
|
||||||
this.targetPartitionSize = targetPartitionSize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isUpdaterJobSpecSet()
|
public boolean isUpdaterJobSpecSet()
|
||||||
|
@ -447,21 +534,15 @@ public class HadoopDruidIndexerConfig
|
||||||
********************************************/
|
********************************************/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the proper bucket for this "row"
|
* Get the proper bucket for some input row.
|
||||||
*
|
*
|
||||||
* @param theMap a Map that represents a "row", keys are column names, values are, well, values
|
* @param inputRow an InputRow
|
||||||
*
|
*
|
||||||
* @return the Bucket that this row belongs to
|
* @return the Bucket that this row belongs to
|
||||||
*/
|
*/
|
||||||
public Optional<Bucket> getBucket(Map<String, String> theMap)
|
public Optional<Bucket> getBucket(InputRow inputRow)
|
||||||
{
|
{
|
||||||
final Optional<Interval> timeBucket = getGranularitySpec().bucketInterval(
|
final Optional<Interval> timeBucket = getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()));
|
||||||
new DateTime(
|
|
||||||
theMap.get(
|
|
||||||
getTimestampColumnName()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
if (!timeBucket.isPresent()) {
|
if (!timeBucket.isPresent()) {
|
||||||
return Optional.absent();
|
return Optional.absent();
|
||||||
}
|
}
|
||||||
|
@ -473,7 +554,7 @@ public class HadoopDruidIndexerConfig
|
||||||
|
|
||||||
for (final HadoopyShardSpec hadoopyShardSpec : shards) {
|
for (final HadoopyShardSpec hadoopyShardSpec : shards) {
|
||||||
final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec();
|
final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec();
|
||||||
if (actualSpec.isInChunk(theMap)) {
|
if (actualSpec.isInChunk(inputRow)) {
|
||||||
return Optional.of(
|
return Optional.of(
|
||||||
new Bucket(
|
new Bucket(
|
||||||
hadoopyShardSpec.getShardNum(),
|
hadoopyShardSpec.getShardNum(),
|
||||||
|
@ -484,7 +565,7 @@ public class HadoopDruidIndexerConfig
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new ISE("row[%s] doesn't fit in any shard[%s]", theMap, shards);
|
throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<Interval> getSegmentGranularIntervals()
|
public Set<Interval> getSegmentGranularIntervals()
|
||||||
|
@ -566,6 +647,11 @@ public class HadoopDruidIndexerConfig
|
||||||
return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
|
return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Path makeGroupedDataDir()
|
||||||
|
{
|
||||||
|
return new Path(makeIntermediatePath(), "groupedData");
|
||||||
|
}
|
||||||
|
|
||||||
public Path makeDescriptorInfoPath(DataSegment segment)
|
public Path makeDescriptorInfoPath(DataSegment segment)
|
||||||
{
|
{
|
||||||
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
|
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
|
||||||
|
@ -626,10 +712,5 @@ public class HadoopDruidIndexerConfig
|
||||||
|
|
||||||
final int nIntervals = getIntervals().size();
|
final int nIntervals = getIntervals().size();
|
||||||
Preconditions.checkArgument(nIntervals > 0, "intervals.size()[%s] <= 0", nIntervals);
|
Preconditions.checkArgument(nIntervals > 0, "intervals.size()[%s] <= 0", nIntervals);
|
||||||
|
|
||||||
if (partitionByDimension()) {
|
|
||||||
Preconditions.checkNotNull(partitionDimension);
|
|
||||||
Preconditions.checkNotNull(targetPartitionSize);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
package com.metamx.druid.indexer;
|
||||||
|
|
||||||
|
import com.metamx.common.RE;
|
||||||
|
import com.metamx.druid.indexer.data.StringInputRowParser;
|
||||||
|
import com.metamx.druid.input.InputRow;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<LongWritable, Text, KEYOUT, VALUEOUT>
|
||||||
|
{
|
||||||
|
private HadoopDruidIndexerConfig config;
|
||||||
|
private StringInputRowParser parser;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setup(Context context)
|
||||||
|
throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||||
|
parser = config.getParser();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HadoopDruidIndexerConfig getConfig()
|
||||||
|
{
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StringInputRowParser getParser()
|
||||||
|
{
|
||||||
|
return parser;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void map(
|
||||||
|
LongWritable key, Text value, Context context
|
||||||
|
) throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
final InputRow inputRow;
|
||||||
|
try {
|
||||||
|
inputRow = parser.parse(value.toString());
|
||||||
|
}
|
||||||
|
catch (IllegalArgumentException e) {
|
||||||
|
if (config.isIgnoreInvalidRows()) {
|
||||||
|
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
|
||||||
|
return; // we're ignoring this invalid row
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(config.getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())).isPresent()) {
|
||||||
|
innerMap(inputRow, value, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (RuntimeException e) {
|
||||||
|
throw new RE(e, "Failure on row[%s]", value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract protected void innerMap(InputRow inputRow, Text text, Context context)
|
||||||
|
throws IOException, InterruptedException;
|
||||||
|
}
|
|
@ -19,31 +19,25 @@
|
||||||
|
|
||||||
package com.metamx.druid.indexer;
|
package com.metamx.druid.indexer;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.io.Closeables;
|
import com.google.common.io.Closeables;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.RE;
|
|
||||||
import com.metamx.common.guava.FunctionalIterable;
|
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import com.metamx.common.parsers.Parser;
|
|
||||||
import com.metamx.common.parsers.ParserUtils;
|
|
||||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
import com.metamx.druid.index.QueryableIndex;
|
import com.metamx.druid.index.QueryableIndex;
|
||||||
import com.metamx.druid.index.v1.IncrementalIndex;
|
import com.metamx.druid.index.v1.IncrementalIndex;
|
||||||
import com.metamx.druid.index.v1.IndexIO;
|
import com.metamx.druid.index.v1.IndexIO;
|
||||||
import com.metamx.druid.index.v1.IndexMerger;
|
import com.metamx.druid.index.v1.IndexMerger;
|
||||||
|
import com.metamx.druid.indexer.data.StringInputRowParser;
|
||||||
import com.metamx.druid.indexer.rollup.DataRollupSpec;
|
import com.metamx.druid.indexer.rollup.DataRollupSpec;
|
||||||
import com.metamx.druid.input.MapBasedInputRow;
|
import com.metamx.druid.input.InputRow;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -53,13 +47,11 @@ import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
|
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
|
||||||
import org.apache.hadoop.io.BytesWritable;
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.Mapper;
|
|
||||||
import org.apache.hadoop.mapreduce.Partitioner;
|
import org.apache.hadoop.mapreduce.Partitioner;
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||||
|
@ -68,7 +60,6 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
@ -78,7 +69,6 @@ import java.net.URI;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.zip.ZipEntry;
|
import java.util.zip.ZipEntry;
|
||||||
import java.util.zip.ZipOutputStream;
|
import java.util.zip.ZipOutputStream;
|
||||||
|
@ -127,7 +117,7 @@ public class IndexGeneratorJob implements Jobby
|
||||||
job.setMapperClass(IndexGeneratorMapper.class);
|
job.setMapperClass(IndexGeneratorMapper.class);
|
||||||
job.setMapOutputValueClass(Text.class);
|
job.setMapOutputValueClass(Text.class);
|
||||||
|
|
||||||
SortableBytes.useSortableBytesAsKey(job);
|
SortableBytes.useSortableBytesAsMapOutputKey(job);
|
||||||
|
|
||||||
job.setNumReduceTasks(Iterables.size(config.getAllBuckets()));
|
job.setNumReduceTasks(Iterables.size(config.getAllBuckets()));
|
||||||
job.setPartitionerClass(IndexGeneratorPartitioner.class);
|
job.setPartitionerClass(IndexGeneratorPartitioner.class);
|
||||||
|
@ -144,7 +134,7 @@ public class IndexGeneratorJob implements Jobby
|
||||||
job.setJarByClass(IndexGeneratorJob.class);
|
job.setJarByClass(IndexGeneratorJob.class);
|
||||||
|
|
||||||
job.submit();
|
job.submit();
|
||||||
log.info("Job submitted, status available at %s", job.getTrackingURL());
|
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
|
||||||
|
|
||||||
boolean success = job.waitForCompletion(true);
|
boolean success = job.waitForCompletion(true);
|
||||||
|
|
||||||
|
@ -159,75 +149,29 @@ public class IndexGeneratorJob implements Jobby
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class IndexGeneratorMapper extends Mapper<LongWritable, Text, BytesWritable, Text>
|
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
|
||||||
{
|
{
|
||||||
private HadoopDruidIndexerConfig config;
|
|
||||||
private Parser<String, Object> parser;
|
|
||||||
private Function<String, DateTime> timestampConverter;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setup(Context context)
|
protected void innerMap(
|
||||||
throws IOException, InterruptedException
|
InputRow inputRow,
|
||||||
{
|
Text text,
|
||||||
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
Context context
|
||||||
parser = config.getDataSpec().getParser();
|
|
||||||
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void map(
|
|
||||||
LongWritable key, Text value, Context context
|
|
||||||
) throws IOException, InterruptedException
|
) throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
|
// Group by bucket, sort by timestamp
|
||||||
|
final Optional<Bucket> bucket = getConfig().getBucket(inputRow);
|
||||||
|
|
||||||
try {
|
if(!bucket.isPresent()) {
|
||||||
final Map<String, Object> values = parser.parse(value.toString());
|
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
|
||||||
|
|
||||||
final String tsStr = (String) values.get(config.getTimestampColumnName());
|
|
||||||
final DateTime timestamp;
|
|
||||||
try {
|
|
||||||
timestamp = timestampConverter.apply(tsStr);
|
|
||||||
}
|
|
||||||
catch (IllegalArgumentException e) {
|
|
||||||
if (config.isIgnoreInvalidRows()) {
|
|
||||||
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
|
|
||||||
return; // we're ignoring this invalid row
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Optional<Bucket> bucket = config.getBucket(
|
|
||||||
Maps.transformEntries(
|
|
||||||
values,
|
|
||||||
new Maps.EntryTransformer<String, Object, String>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public String transformEntry(@Nullable String key, @Nullable Object value)
|
|
||||||
{
|
|
||||||
if (key.equalsIgnoreCase(config.getTimestampColumnName())) {
|
|
||||||
return timestamp.toString();
|
|
||||||
}
|
|
||||||
return value.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
if (bucket.isPresent()) {
|
|
||||||
// Group by bucket, sort by timestamp
|
|
||||||
context.write(
|
|
||||||
new SortableBytes(
|
|
||||||
bucket.get().toGroupKey(),
|
|
||||||
Longs.toByteArray(timestamp.getMillis())
|
|
||||||
).toBytesWritable(),
|
|
||||||
value
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (RuntimeException e) {
|
|
||||||
throw new RE(e, "Failure on row[%s]", value);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
context.write(
|
||||||
|
new SortableBytes(
|
||||||
|
bucket.get().toGroupKey(),
|
||||||
|
Longs.toByteArray(inputRow.getTimestampFromEpoch())
|
||||||
|
).toBytesWritable(),
|
||||||
|
text
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,8 +197,7 @@ public class IndexGeneratorJob implements Jobby
|
||||||
{
|
{
|
||||||
private HadoopDruidIndexerConfig config;
|
private HadoopDruidIndexerConfig config;
|
||||||
private List<String> metricNames = Lists.newArrayList();
|
private List<String> metricNames = Lists.newArrayList();
|
||||||
private Function<String, DateTime> timestampConverter;
|
private StringInputRowParser parser;
|
||||||
private Parser parser;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setup(Context context)
|
protected void setup(Context context)
|
||||||
|
@ -265,8 +208,8 @@ public class IndexGeneratorJob implements Jobby
|
||||||
for (AggregatorFactory factory : config.getRollupSpec().getAggs()) {
|
for (AggregatorFactory factory : config.getRollupSpec().getAggs()) {
|
||||||
metricNames.add(factory.getName().toLowerCase());
|
metricNames.add(factory.getName().toLowerCase());
|
||||||
}
|
}
|
||||||
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
|
|
||||||
parser = config.getDataSpec().getParser();
|
parser = config.getParser();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -299,32 +242,10 @@ public class IndexGeneratorJob implements Jobby
|
||||||
|
|
||||||
for (final Text value : values) {
|
for (final Text value : values) {
|
||||||
context.progress();
|
context.progress();
|
||||||
Map<String, Object> event = parser.parse(value.toString());
|
final InputRow inputRow = parser.parse(value.toString());
|
||||||
final long timestamp = timestampConverter.apply((String) event.get(config.getTimestampColumnName()))
|
allDimensionNames.addAll(inputRow.getDimensions());
|
||||||
.getMillis();
|
|
||||||
List<String> dimensionNames =
|
|
||||||
config.getDataSpec().hasCustomDimensions() ?
|
|
||||||
config.getDataSpec().getDimensions() :
|
|
||||||
Lists.newArrayList(
|
|
||||||
FunctionalIterable.create(event.keySet())
|
|
||||||
.filter(
|
|
||||||
new Predicate<String>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public boolean apply(@Nullable String input)
|
|
||||||
{
|
|
||||||
return !(metricNames.contains(input.toLowerCase())
|
|
||||||
|| config.getTimestampColumnName()
|
|
||||||
.equalsIgnoreCase(input));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
|
||||||
allDimensionNames.addAll(dimensionNames);
|
|
||||||
|
|
||||||
int numRows = index.add(
|
int numRows = index.add(inputRow);
|
||||||
new MapBasedInputRow(timestamp, dimensionNames, event)
|
|
||||||
);
|
|
||||||
++lineCount;
|
++lineCount;
|
||||||
|
|
||||||
if (numRows >= rollupSpec.rowFlushBoundary) {
|
if (numRows >= rollupSpec.rowFlushBoundary) {
|
||||||
|
|
|
@ -102,7 +102,7 @@ public class SortableBytes
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void useSortableBytesAsKey(Job job)
|
public static void useSortableBytesAsMapOutputKey(Job job)
|
||||||
{
|
{
|
||||||
job.setMapOutputKeyClass(BytesWritable.class);
|
job.setMapOutputKeyClass(BytesWritable.class);
|
||||||
job.setGroupingComparatorClass(SortableBytesGroupingComparator.class);
|
job.setGroupingComparatorClass(SortableBytesGroupingComparator.class);
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
package com.metamx.druid.indexer.granularity;
|
package com.metamx.druid.indexer.granularity;
|
||||||
|
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.metamx.common.Granularity;
|
import com.metamx.common.Granularity;
|
||||||
import com.metamx.common.guava.Comparators;
|
import com.metamx.common.guava.Comparators;
|
||||||
|
@ -35,47 +38,47 @@ import java.util.TreeSet;
|
||||||
public class UniformGranularitySpec implements GranularitySpec
|
public class UniformGranularitySpec implements GranularitySpec
|
||||||
{
|
{
|
||||||
final private Granularity granularity;
|
final private Granularity granularity;
|
||||||
final private List<Interval> intervals;
|
final private List<Interval> inputIntervals;
|
||||||
|
final private ArbitraryGranularitySpec wrappedSpec;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public UniformGranularitySpec(
|
public UniformGranularitySpec(
|
||||||
@JsonProperty("gran") Granularity granularity,
|
@JsonProperty("gran") Granularity granularity,
|
||||||
@JsonProperty("intervals") List<Interval> intervals
|
@JsonProperty("intervals") List<Interval> inputIntervals
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
List<Interval> granularIntervals = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (Interval inputInterval : inputIntervals) {
|
||||||
|
Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval));
|
||||||
|
}
|
||||||
|
|
||||||
this.granularity = granularity;
|
this.granularity = granularity;
|
||||||
this.intervals = intervals;
|
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
|
||||||
|
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SortedSet<Interval> bucketIntervals()
|
public SortedSet<Interval> bucketIntervals()
|
||||||
{
|
{
|
||||||
final TreeSet<Interval> retVal = Sets.newTreeSet(Comparators.intervals());
|
return wrappedSpec.bucketIntervals();
|
||||||
|
|
||||||
for (Interval interval : intervals) {
|
|
||||||
for (Interval segmentInterval : granularity.getIterable(interval)) {
|
|
||||||
retVal.add(segmentInterval);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return retVal;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Interval> bucketInterval(DateTime dt)
|
public Optional<Interval> bucketInterval(DateTime dt)
|
||||||
{
|
{
|
||||||
return Optional.of(granularity.bucket(dt));
|
return wrappedSpec.bucketInterval(dt);
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty("gran")
|
||||||
public Granularity getGranularity()
|
public Granularity getGranularity()
|
||||||
{
|
{
|
||||||
return granularity;
|
return granularity;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty("intervals")
|
||||||
public Iterable<Interval> getIntervals()
|
public Iterable<Interval> getIntervals()
|
||||||
{
|
{
|
||||||
return intervals;
|
return inputIntervals;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
package com.metamx.druid.indexer.partitions;
|
||||||
|
|
||||||
|
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||||
|
import org.codehaus.jackson.annotate.JsonProperty;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
public class PartitionsSpec
|
||||||
|
{
|
||||||
|
@Nullable
|
||||||
|
private final String partitionDimension;
|
||||||
|
|
||||||
|
private final long targetPartitionSize;
|
||||||
|
|
||||||
|
private final boolean assumeGrouped;
|
||||||
|
|
||||||
|
public PartitionsSpec(
|
||||||
|
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
|
||||||
|
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
|
||||||
|
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.partitionDimension = partitionDimension;
|
||||||
|
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
|
||||||
|
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public boolean isDeterminingPartitions()
|
||||||
|
{
|
||||||
|
return targetPartitionSize > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
@Nullable
|
||||||
|
public String getPartitionDimension()
|
||||||
|
{
|
||||||
|
return partitionDimension;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getTargetPartitionSize()
|
||||||
|
{
|
||||||
|
return targetPartitionSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public boolean isAssumeGrouped()
|
||||||
|
{
|
||||||
|
return assumeGrouped;
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,6 +22,7 @@ package com.metamx.druid.indexer;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
|
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
|
||||||
|
import com.metamx.druid.indexer.partitions.PartitionsSpec;
|
||||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
@ -67,7 +68,7 @@ public class HadoopDruidIndexerConfigTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIntervalsAndSegmentGranularity() {
|
public void testGranularitySpecLegacy() {
|
||||||
// Deprecated and replaced by granularitySpec, but still supported
|
// Deprecated and replaced by granularitySpec, but still supported
|
||||||
final HadoopDruidIndexerConfig cfg;
|
final HadoopDruidIndexerConfig cfg;
|
||||||
|
|
||||||
|
@ -98,9 +99,8 @@ public class HadoopDruidIndexerConfigTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCmdlineAndSegmentGranularity() {
|
public void testGranularitySpecPostConstructorIntervals() {
|
||||||
// Deprecated and replaced by granularitySpec, but still supported
|
// Deprecated and replaced by granularitySpec, but still supported
|
||||||
final HadoopDruidIndexerConfig cfg;
|
final HadoopDruidIndexerConfig cfg;
|
||||||
|
|
||||||
|
@ -133,7 +133,7 @@ public class HadoopDruidIndexerConfigTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvalidCombination() {
|
public void testInvalidGranularityCombination() {
|
||||||
boolean thrown = false;
|
boolean thrown = false;
|
||||||
try {
|
try {
|
||||||
final HadoopDruidIndexerConfig cfg = jsonMapper.readValue(
|
final HadoopDruidIndexerConfig cfg = jsonMapper.readValue(
|
||||||
|
@ -154,4 +154,160 @@ public class HadoopDruidIndexerConfigTest
|
||||||
|
|
||||||
Assert.assertTrue("Exception thrown", thrown);
|
Assert.assertTrue("Exception thrown", thrown);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionsSpecNoPartitioning() {
|
||||||
|
final HadoopDruidIndexerConfig cfg;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cfg = jsonMapper.readValue(
|
||||||
|
"{}",
|
||||||
|
HadoopDruidIndexerConfig.class
|
||||||
|
);
|
||||||
|
} catch(Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"isDeterminingPartitions",
|
||||||
|
partitionsSpec.isDeterminingPartitions(),
|
||||||
|
false
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionsSpecAutoDimension() {
|
||||||
|
final HadoopDruidIndexerConfig cfg;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cfg = jsonMapper.readValue(
|
||||||
|
"{"
|
||||||
|
+ "\"partitionsSpec\":{"
|
||||||
|
+ " \"targetPartitionSize\":100"
|
||||||
|
+ " }"
|
||||||
|
+ "}",
|
||||||
|
HadoopDruidIndexerConfig.class
|
||||||
|
);
|
||||||
|
} catch(Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"isDeterminingPartitions",
|
||||||
|
partitionsSpec.isDeterminingPartitions(),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"getTargetPartitionSize",
|
||||||
|
partitionsSpec.getTargetPartitionSize(),
|
||||||
|
100
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"getPartitionDimension",
|
||||||
|
partitionsSpec.getPartitionDimension(),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionsSpecSpecificDimension() {
|
||||||
|
final HadoopDruidIndexerConfig cfg;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cfg = jsonMapper.readValue(
|
||||||
|
"{"
|
||||||
|
+ "\"partitionsSpec\":{"
|
||||||
|
+ " \"targetPartitionSize\":100,"
|
||||||
|
+ " \"partitionDimension\":\"foo\""
|
||||||
|
+ " }"
|
||||||
|
+ "}",
|
||||||
|
HadoopDruidIndexerConfig.class
|
||||||
|
);
|
||||||
|
} catch(Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"isDeterminingPartitions",
|
||||||
|
partitionsSpec.isDeterminingPartitions(),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"getTargetPartitionSize",
|
||||||
|
partitionsSpec.getTargetPartitionSize(),
|
||||||
|
100
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"getPartitionDimension",
|
||||||
|
partitionsSpec.getPartitionDimension(),
|
||||||
|
"foo"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionsSpecLegacy() {
|
||||||
|
final HadoopDruidIndexerConfig cfg;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cfg = jsonMapper.readValue(
|
||||||
|
"{"
|
||||||
|
+ "\"targetPartitionSize\":100,"
|
||||||
|
+ "\"partitionDimension\":\"foo\""
|
||||||
|
+ "}",
|
||||||
|
HadoopDruidIndexerConfig.class
|
||||||
|
);
|
||||||
|
} catch(Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"isDeterminingPartitions",
|
||||||
|
partitionsSpec.isDeterminingPartitions(),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"getTargetPartitionSize",
|
||||||
|
partitionsSpec.getTargetPartitionSize(),
|
||||||
|
100
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"getPartitionDimension",
|
||||||
|
partitionsSpec.getPartitionDimension(),
|
||||||
|
"foo"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidPartitionsCombination() {
|
||||||
|
boolean thrown = false;
|
||||||
|
try {
|
||||||
|
final HadoopDruidIndexerConfig cfg = jsonMapper.readValue(
|
||||||
|
"{"
|
||||||
|
+ "\"targetPartitionSize\":100,"
|
||||||
|
+ "\"partitionsSpec\":{"
|
||||||
|
+ " \"targetPartitionSize\":100"
|
||||||
|
+ " }"
|
||||||
|
+ "}",
|
||||||
|
HadoopDruidIndexerConfig.class
|
||||||
|
);
|
||||||
|
} catch(Exception e) {
|
||||||
|
thrown = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue("Exception thrown", thrown);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,6 +69,12 @@ public class ArbitraryGranularityTest
|
||||||
spec.bucketInterval(new DateTime("2012-01-03T01Z"))
|
spec.bucketInterval(new DateTime("2012-01-03T01Z"))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"2012-01-04T01Z",
|
||||||
|
Optional.<Interval>absent(),
|
||||||
|
spec.bucketInterval(new DateTime("2012-01-04T01Z"))
|
||||||
|
);
|
||||||
|
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"2012-01-07T23:59:59.999Z",
|
"2012-01-07T23:59:59.999Z",
|
||||||
Optional.of(new Interval("2012-01-07T00Z/2012-01-08T00Z")),
|
Optional.of(new Interval("2012-01-07T00Z/2012-01-08T00Z")),
|
||||||
|
|
|
@ -72,6 +72,12 @@ public class UniformGranularityTest
|
||||||
spec.bucketInterval(new DateTime("2012-01-03T01Z"))
|
spec.bucketInterval(new DateTime("2012-01-03T01Z"))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"2012-01-04T01Z",
|
||||||
|
Optional.<Interval>absent(),
|
||||||
|
spec.bucketInterval(new DateTime("2012-01-04T01Z"))
|
||||||
|
);
|
||||||
|
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"2012-01-07T23:59:59.999Z",
|
"2012-01-07T23:59:59.999Z",
|
||||||
Optional.of(new Interval("2012-01-07T00Z/2012-01-08T00Z")),
|
Optional.of(new Interval("2012-01-07T00Z/2012-01-08T00Z")),
|
||||||
|
|
Loading…
Reference in New Issue