diff --git a/index-common/src/main/java/com/metamx/druid/indexer/data/StringInputRowParser.java b/index-common/src/main/java/com/metamx/druid/indexer/data/StringInputRowParser.java index 4c5e2dcd9bb..3721f2c14f7 100644 --- a/index-common/src/main/java/com/metamx/druid/indexer/data/StringInputRowParser.java +++ b/index-common/src/main/java/com/metamx/druid/indexer/data/StringInputRowParser.java @@ -19,6 +19,7 @@ package com.metamx.druid.indexer.data; +import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.exception.FormattedException; @@ -56,7 +57,18 @@ public class StringInputRowParser this.dimensionExclusions = Sets.newHashSet(); if (dimensionExclusions != null) { - this.dimensionExclusions.addAll(dimensionExclusions); + this.dimensionExclusions.addAll( + Lists.transform( + dimensionExclusions, new Function() + { + @Override + public String apply(String s) + { + return s.toLowerCase(); + } + } + ) + ); } this.dimensionExclusions.add(timestampSpec.getTimestampColumn()); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java index a40642ebec5..0e6a686a0f7 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java @@ -23,21 +23,23 @@ import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.PeekingIterator; import com.google.common.io.Closeables; -import com.metamx.common.IAE; -import com.metamx.common.Pair; +import com.metamx.common.ISE; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.logger.Logger; -import com.metamx.common.parsers.Parser; -import com.metamx.common.parsers.ParserUtils; import com.metamx.druid.CombiningIterable; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.input.InputRow; import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.shard.SingleDimensionShardSpec; @@ -45,7 +47,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InvalidJobConfException; @@ -56,8 +58,11 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.codehaus.jackson.type.TypeReference; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; @@ -65,20 +70,26 @@ import org.joda.time.Interval; import java.io.IOException; import java.io.OutputStream; -import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** + * Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so, + * choosing the highest cardinality dimension that satisfies the criteria: + * + * */ public class DeterminePartitionsJob implements Jobby { private static final Logger log = new Logger(DeterminePartitionsJob.class); - private static final Joiner keyJoiner = Joiner.on(","); - private static final Splitter keySplitter = Splitter.on(","); private static final Joiner tabJoiner = HadoopDruidIndexerConfig.tabJoiner; private static final Splitter tabSplitter = HadoopDruidIndexerConfig.tabSplitter; @@ -91,146 +102,314 @@ public class DeterminePartitionsJob implements Jobby this.config = config; } - public boolean run() + public static void injectSystemProperties(Job job) { - try { - Job job = new Job( - new Configuration(), - String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals()) - ); - - job.getConfiguration().set("io.sort.record.percent", "0.19"); - for (String propName : System.getProperties().stringPropertyNames()) { - Configuration conf = job.getConfiguration(); - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } + final Configuration conf = job.getConfiguration(); + for (String propName : System.getProperties().stringPropertyNames()) { + if (propName.startsWith("hadoop.")) { + conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); } - - job.setInputFormatClass(TextInputFormat.class); - - job.setMapperClass(DeterminePartitionsMapper.class); - job.setMapOutputValueClass(Text.class); - - SortableBytes.useSortableBytesAsKey(job); - - job.setCombinerClass(DeterminePartitionsCombiner.class); - job.setReducerClass(DeterminePartitionsReducer.class); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(Text.class); - job.setOutputFormatClass(DeterminePartitionsJob.DeterminePartitionsOutputFormat.class); - FileOutputFormat.setOutputPath(job, config.makeIntermediatePath()); - - config.addInputPaths(job); - config.intoConfiguration(job); - - job.setJarByClass(DeterminePartitionsJob.class); - - job.submit(); - log.info("Job submitted, status available at %s", job.getTrackingURL()); - - final boolean retVal = job.waitForCompletion(true); - - if (retVal) { - log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals()); - FileSystem fileSystem = null; - Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); - int shardCount = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { - DateTime bucket = segmentGranularity.getStart(); - - final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)); - if (fileSystem == null) { - fileSystem = partitionInfoPath.getFileSystem(job.getConfiguration()); - } - if (fileSystem.exists(partitionInfoPath)) { - List specs = config.jsonMapper.readValue( - Utils.openInputStream(job, partitionInfoPath), new TypeReference>() - { - } - ); - - List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); - for (int i = 0; i < specs.size(); ++i) { - actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); - } - - shardSpecs.put(bucket, actualSpecs); - } - else { - log.info("Path[%s] didn't exist!?", partitionInfoPath); - } - } - config.setShardSpecs(shardSpecs); - } - else { - log.info("Job completed unsuccessfully."); - } - - return retVal; - } - catch (Exception e) { - throw new RuntimeException(e); } } - public static class DeterminePartitionsMapper extends Mapper + public boolean run() { - private HadoopDruidIndexerConfig config; - private String partitionDimension; - private Parser parser; - private Function timestampConverter; + try { + /* + * Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear + * in the final segment. + */ + + if(!config.getPartitionsSpec().isAssumeGrouped()) { + final Job groupByJob = new Job( + new Configuration(), + String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals()) + ); + + injectSystemProperties(groupByJob); + groupByJob.setInputFormatClass(TextInputFormat.class); + groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); + groupByJob.setMapOutputKeyClass(BytesWritable.class); + groupByJob.setMapOutputValueClass(NullWritable.class); + groupByJob.setCombinerClass(DeterminePartitionsGroupByReducer.class); + groupByJob.setReducerClass(DeterminePartitionsGroupByReducer.class); + groupByJob.setOutputKeyClass(BytesWritable.class); + groupByJob.setOutputValueClass(NullWritable.class); + groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); + groupByJob.setJarByClass(DeterminePartitionsJob.class); + + config.addInputPaths(groupByJob); + config.intoConfiguration(groupByJob); + FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir()); + + groupByJob.submit(); + log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL()); + + if(!groupByJob.waitForCompletion(true)) { + log.error("Job failed: %s", groupByJob.getJobID().toString()); + return false; + } + } else { + log.info("Skipping group-by job."); + } + + /* + * Read grouped data and determine appropriate partitions. + */ + final Job dimSelectionJob = new Job( + new Configuration(), + String.format("%s-determine_partitions_dimselection-%s", config.getDataSource(), config.getIntervals()) + ); + + dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19"); + + injectSystemProperties(dimSelectionJob); + + if(!config.getPartitionsSpec().isAssumeGrouped()) { + // Read grouped data from the groupByJob. + dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionPostGroupByMapper.class); + dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class); + FileInputFormat.addInputPath(dimSelectionJob, config.makeGroupedDataDir()); + } else { + // Directly read the source data, since we assume it's already grouped. + dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class); + dimSelectionJob.setInputFormatClass(TextInputFormat.class); + config.addInputPaths(dimSelectionJob); + } + + SortableBytes.useSortableBytesAsMapOutputKey(dimSelectionJob); + dimSelectionJob.setMapOutputValueClass(Text.class); + dimSelectionJob.setCombinerClass(DeterminePartitionsDimSelectionCombiner.class); + dimSelectionJob.setReducerClass(DeterminePartitionsDimSelectionReducer.class); + dimSelectionJob.setOutputKeyClass(BytesWritable.class); + dimSelectionJob.setOutputValueClass(Text.class); + dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); + dimSelectionJob.setJarByClass(DeterminePartitionsJob.class); + + config.intoConfiguration(dimSelectionJob); + FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath()); + + dimSelectionJob.submit(); + log.info( + "Job %s submitted, status available at: %s", + dimSelectionJob.getJobName(), + dimSelectionJob.getTrackingURL() + ); + + if(!dimSelectionJob.waitForCompletion(true)) { + log.error("Job failed: %s", dimSelectionJob.getJobID().toString()); + return false; + } + + /* + * Load partitions determined by the previous job. + */ + + log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals()); + FileSystem fileSystem = null; + Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); + int shardCount = 0; + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { + DateTime bucket = segmentGranularity.getStart(); + + final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)); + if (fileSystem == null) { + fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); + } + if (fileSystem.exists(partitionInfoPath)) { + List specs = config.jsonMapper.readValue( + Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() + { + } + ); + + List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); + for (int i = 0; i < specs.size(); ++i) { + actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); + } + + shardSpecs.put(bucket, actualSpecs); + } + else { + log.info("Path[%s] didn't exist!?", partitionInfoPath); + } + } + config.setShardSpecs(shardSpecs); + + return true; + } catch(Exception e) { + throw Throwables.propagate(e); + } + } + + public static class DeterminePartitionsGroupByMapper extends HadoopDruidIndexerMapper + { + private QueryGranularity rollupGranularity = null; @Override protected void setup(Context context) throws IOException, InterruptedException { - config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - partitionDimension = config.getPartitionDimension(); - parser = config.getDataSpec().getParser(); - timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat()); + super.setup(context); + rollupGranularity = getConfig().getRollupSpec().getRollupGranularity(); + } + + @Override + protected void innerMap( + InputRow inputRow, + Text text, + Context context + ) throws IOException, InterruptedException + { + // Create group key + // TODO -- There are more efficient ways to do this + final Map> dims = Maps.newTreeMap(); + for(final String dim : inputRow.getDimensions()) { + final Set dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim)); + if(dimValues.size() > 0) { + dims.put(dim, dimValues); + } + } + final List groupKey = ImmutableList.of( + rollupGranularity.truncate(inputRow.getTimestampFromEpoch()), + dims + ); + context.write( + new BytesWritable(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)), + NullWritable.get() + ); + } + } + + public static class DeterminePartitionsGroupByReducer + extends Reducer + { + @Override + protected void reduce( + BytesWritable key, + Iterable values, + Context context + ) throws IOException, InterruptedException + { + context.write(key, NullWritable.get()); + } + } + + /** + * This DimSelection mapper runs on data generated by our GroupBy job. + */ + public static class DeterminePartitionsDimSelectionPostGroupByMapper + extends Mapper + { + private DeterminePartitionsDimSelectionMapperHelper helper; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + final String partitionDimension = config.getPartitionDimension(); + helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension); } @Override protected void map( - LongWritable key, Text value, Context context + BytesWritable key, NullWritable value, Context context ) throws IOException, InterruptedException { - Map values = parser.parse(value.toString()); - final DateTime timestamp; - final String tsStr = (String) values.get(config.getTimestampColumnName()); - try { - timestamp = timestampConverter.apply(tsStr); - } - catch(IllegalArgumentException e) { - if(config.isIgnoreInvalidRows()) { - context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); - return; // we're ignoring this invalid row - } - else { - throw e; - } - } + final List timeAndDims = HadoopDruidIndexerConfig.jsonMapper.readValue(key.getBytes(), List.class); + final DateTime timestamp = new DateTime(timeAndDims.get(0)); + final Map> dims = (Map>) timeAndDims.get(1); + + helper.emitDimValueCounts(context, timestamp, dims); + } + } + + /** + * This DimSelection mapper runs on raw input data that we assume has already been grouped. + */ + public static class DeterminePartitionsDimSelectionAssumeGroupedMapper + extends HadoopDruidIndexerMapper + { + private DeterminePartitionsDimSelectionMapperHelper helper; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + super.setup(context); + final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + final String partitionDimension = config.getPartitionDimension(); + helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension); + } + + @Override + protected void innerMap( + InputRow inputRow, + Text text, + Context context + ) throws IOException, InterruptedException + { + final Map> dims = Maps.newHashMap(); + for(final String dim : inputRow.getDimensions()) { + dims.put(dim, inputRow.getDimension(dim)); + } + helper.emitDimValueCounts(context, new DateTime(inputRow.getTimestampFromEpoch()), dims); + } + } + + /** + * Since we have two slightly different DimSelectionMappers, this class encapsulates the shared logic for + * emitting dimension value counts. + */ + public static class DeterminePartitionsDimSelectionMapperHelper + { + private final HadoopDruidIndexerConfig config; + private final String partitionDimension; + + public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension) + { + this.config = config; + this.partitionDimension = partitionDimension; + } + + public void emitDimValueCounts( + TaskInputOutputContext context, + DateTime timestamp, + Map> dims + ) throws IOException, InterruptedException + { final Optional maybeInterval = config.getGranularitySpec().bucketInterval(timestamp); - if(maybeInterval.isPresent()) { - final DateTime bucket = maybeInterval.get().getStart(); - final String outKey = keyJoiner.join(bucket.toString(), partitionDimension); - final Object dimValue = values.get(partitionDimension); - if (! (dimValue instanceof String)) { - throw new IAE("Cannot partition on a tag-style dimension[%s], line was[%s]", partitionDimension, value); + if(!maybeInterval.isPresent()) { + throw new ISE("WTF?! No bucket found for timestamp: %s", timestamp); + } + + final Interval interval = maybeInterval.get(); + final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8); + + for(final Map.Entry> dimAndValues : dims.entrySet()) { + final String dim = dimAndValues.getKey(); + + if(partitionDimension == null || partitionDimension.equals(dim)) { + final Iterable dimValues = dimAndValues.getValue(); + + if(Iterables.size(dimValues) == 1) { + // Emit this value. + write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1)); + } else { + // This dimension is unsuitable for partitioning. Poison it by emitting a negative value. + write(context, groupKey, new DimValueCount(dim, "", -1)); + } } - - final byte[] groupKey = outKey.getBytes(Charsets.UTF_8); - write(context, groupKey, "", 1); - write(context, groupKey, (String) dimValue, 1); } } } - private static abstract class DeterminePartitionsBaseReducer extends Reducer + private static abstract class DeterminePartitionsDimSelectionBaseReducer + extends Reducer { protected static volatile HadoopDruidIndexerConfig config = null; @@ -240,7 +419,7 @@ public class DeterminePartitionsJob implements Jobby throws IOException, InterruptedException { if (config == null) { - synchronized (DeterminePartitionsBaseReducer.class) { + synchronized (DeterminePartitionsDimSelectionBaseReducer.class) { if (config == null) { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); } @@ -255,166 +434,275 @@ public class DeterminePartitionsJob implements Jobby { SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); - final Iterable> combinedIterable = combineRows(values); + final Iterable combinedIterable = combineRows(values); innerReduce(context, keyBytes, combinedIterable); } protected abstract void innerReduce( - Context context, SortableBytes keyBytes, Iterable> combinedIterable + Context context, SortableBytes keyBytes, Iterable combinedIterable ) throws IOException, InterruptedException; - private Iterable> combineRows(Iterable input) + private Iterable combineRows(Iterable input) { - return new CombiningIterable>( + return new CombiningIterable( Iterables.transform( input, - new Function>() + new Function() { @Override - public Pair apply(Text input) + public DimValueCount apply(Text input) { - Iterator splits = tabSplitter.split(input.toString()).iterator(); - return new Pair(splits.next(), Long.parseLong(splits.next())); + return DimValueCount.fromText(input); } } ), - new Comparator>() + new Comparator() { @Override - public int compare(Pair o1, Pair o2) + public int compare(DimValueCount o1, DimValueCount o2) { - return o1.lhs.compareTo(o2.lhs); + return ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result(); } }, - new BinaryFn, Pair, Pair>() + new BinaryFn() { @Override - public Pair apply(Pair arg1, Pair arg2) + public DimValueCount apply(DimValueCount arg1, DimValueCount arg2) { if (arg2 == null) { return arg1; } - return new Pair(arg1.lhs, arg1.rhs + arg2.rhs); + // Respect "poisoning" (negative values mean we can't use this dimension) + final int newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1); + return new DimValueCount(arg1.dim, arg1.value, newNumRows); } } ); } } - public static class DeterminePartitionsCombiner extends DeterminePartitionsBaseReducer + public static class DeterminePartitionsDimSelectionCombiner extends DeterminePartitionsDimSelectionBaseReducer { @Override protected void innerReduce( - Context context, SortableBytes keyBytes, Iterable> combinedIterable + Context context, SortableBytes keyBytes, Iterable combinedIterable ) throws IOException, InterruptedException { - for (Pair pair : combinedIterable) { - write(context, keyBytes.getGroupKey(), pair.lhs, pair.rhs); + for (DimValueCount dvc : combinedIterable) { + write(context, keyBytes.getGroupKey(), dvc); } } } - public static class DeterminePartitionsReducer extends DeterminePartitionsBaseReducer + public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer { - String previousBoundary; - long runningTotal; + private static final double SHARD_COMBINE_THRESHOLD = 0.25; + private static final double SHARD_OVERSIZE_THRESHOLD = 1.5; @Override protected void innerReduce( - Context context, SortableBytes keyBytes, Iterable> combinedIterable + Context context, SortableBytes keyBytes, Iterable combinedIterable ) throws IOException, InterruptedException { - PeekingIterator> iterator = Iterators.peekingIterator(combinedIterable.iterator()); - Pair totalPair = iterator.next(); + PeekingIterator iterator = Iterators.peekingIterator(combinedIterable.iterator()); - Preconditions.checkState(totalPair.lhs.equals(""), "Total pair value was[%s]!?", totalPair.lhs); - long totalRows = totalPair.rhs; + // "iterator" will take us over many candidate dimensions + DimPartitions currentDimPartitions = null; + DimPartition currentDimPartition = null; + String currentDimPartitionStart = null; + boolean currentDimSkip = false; - long numPartitions = Math.max(totalRows / config.getTargetPartitionSize(), 1); - long expectedRowsPerPartition = totalRows / numPartitions; + // We'll store possible partitions in here + final Map dimPartitionss = Maps.newHashMap(); - class PartitionsList extends ArrayList - { - } - List partitions = new PartitionsList(); + while(iterator.hasNext()) { + final DimValueCount dvc = iterator.next(); - runningTotal = 0; - Pair prev = null; - previousBoundary = null; - while (iterator.hasNext()) { - Pair curr = iterator.next(); - - if (runningTotal > expectedRowsPerPartition) { - Preconditions.checkNotNull( - prev, "Prev[null] while runningTotal[%s] was > expectedRows[%s]!?", runningTotal, expectedRowsPerPartition - ); - - addPartition(partitions, curr.lhs); + if(currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) { + // Starting a new dimension! Exciting! + currentDimPartitions = new DimPartitions(dvc.dim); + currentDimPartition = new DimPartition(); + currentDimPartitionStart = null; + currentDimSkip = false; } - runningTotal += curr.rhs; - prev = curr; + // Respect poisoning + if(!currentDimSkip && dvc.numRows < 0) { + log.info("Cannot partition on multi-valued dimension: %s", dvc.dim); + currentDimSkip = true; + } + + if(currentDimSkip) { + continue; + } + + // See if we need to cut a new partition ending immediately before this dimension value + if(currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) { + final ShardSpec shardSpec = new SingleDimensionShardSpec( + currentDimPartitions.dim, + currentDimPartitionStart, + dvc.value, + currentDimPartitions.partitions.size() + ); + + log.info( + "Adding possible shard with %,d rows and %,d unique values: %s", + currentDimPartition.rows, + currentDimPartition.cardinality, + shardSpec + ); + + currentDimPartition.shardSpec = shardSpec; + currentDimPartitions.partitions.add(currentDimPartition); + currentDimPartition = new DimPartition(); + currentDimPartitionStart = dvc.value; + } + + // Update counters + currentDimPartition.cardinality ++; + currentDimPartition.rows += dvc.numRows; + + if(!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) { + // Finalize the current dimension + + if(currentDimPartition.rows > 0) { + // One more shard to go + final ShardSpec shardSpec; + + if (currentDimPartitions.partitions.isEmpty()) { + shardSpec = new NoneShardSpec(); + } else { + if(currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) { + // Combine with previous shard + final DimPartition previousDimPartition = currentDimPartitions.partitions.remove( + currentDimPartitions.partitions.size() - 1 + ); + + final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec; + + shardSpec = new SingleDimensionShardSpec( + currentDimPartitions.dim, + previousShardSpec.getStart(), + null, + previousShardSpec.getPartitionNum() + ); + + log.info("Removing possible shard: %s", previousShardSpec); + + currentDimPartition.rows += previousDimPartition.rows; + currentDimPartition.cardinality += previousDimPartition.cardinality; + } else { + // Create new shard + shardSpec = new SingleDimensionShardSpec( + currentDimPartitions.dim, + currentDimPartitionStart, + null, + currentDimPartitions.partitions.size() + ); + } + } + + log.info( + "Adding possible shard with %,d rows and %,d unique values: %s", + currentDimPartition.rows, + currentDimPartition.cardinality, + shardSpec + ); + + currentDimPartition.shardSpec = shardSpec; + currentDimPartitions.partitions.add(currentDimPartition); + } + + log.info( + "Completed dimension[%s]: %,d possible shards with %,d unique values", + currentDimPartitions.dim, + currentDimPartitions.partitions.size(), + currentDimPartitions.getCardinality() + ); + + // Add ourselves to the partitions map + dimPartitionss.put(currentDimPartitions.dim, currentDimPartitions); + } } - if (partitions.isEmpty()) { - partitions.add(new NoneShardSpec()); - } else if (((double) runningTotal / (double) expectedRowsPerPartition) < 0.25) { - final SingleDimensionShardSpec lastSpec = (SingleDimensionShardSpec) partitions.remove(partitions.size() - 1); - partitions.add( - new SingleDimensionShardSpec( - config.getPartitionDimension(), - lastSpec.getStart(), - null, - lastSpec.getPartitionNum() - ) - ); - } else { - partitions.add( - new SingleDimensionShardSpec( - config.getPartitionDimension(), - previousBoundary, - null, - partitions.size() - ) - ); + // Choose best dimension + if(dimPartitionss.isEmpty()) { + throw new ISE("No suitable partitioning dimension found!"); } - DateTime bucket = new DateTime( - Iterables.get(keySplitter.split(new String(keyBytes.getGroupKey(), Charsets.UTF_8)), 0) - ); - OutputStream out = Utils.makePathAndOutputStream( + final int totalRows = dimPartitionss.values().iterator().next().getRows(); + + int maxCardinality = -1; + DimPartitions maxCardinalityPartitions = null; + + for(final DimPartitions dimPartitions : dimPartitionss.values()) { + if(dimPartitions.getRows() != totalRows) { + throw new ISE( + "WTF?! Dimension[%s] row count %,d != expected row count %,d", + dimPartitions.dim, + dimPartitions.getRows(), + totalRows + ); + } + + // Make sure none of these shards are oversized + boolean oversized = false; + for(final DimPartition partition : dimPartitions.partitions) { + if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) { + log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); + oversized = true; + } + } + + if(oversized) { + continue; + } + + if(dimPartitions.getCardinality() > maxCardinality) { + maxCardinality = dimPartitions.getCardinality(); + maxCardinalityPartitions = dimPartitions; + } + } + + if(maxCardinalityPartitions == null) { + throw new ISE("No suitable partitioning dimension found!"); + } + + final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8)); + final OutputStream out = Utils.makePathAndOutputStream( context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles() ); - for (ShardSpec partition : partitions) { - log.info("%s", partition); + final List chosenShardSpecs = Lists.transform( + maxCardinalityPartitions.partitions, new Function() + { + @Override + public ShardSpec apply(DimPartition dimPartition) + { + return dimPartition.shardSpec; + } + } + ); + + log.info("Chosen partitions:"); + for (ShardSpec shardSpec : chosenShardSpecs) { + log.info(" %s", shardSpec); } try { - config.jsonMapper.writeValue(out, partitions); + HadoopDruidIndexerConfig.jsonMapper.writerWithType(new TypeReference>() {}).writeValue( + out, + chosenShardSpecs + ); } finally { Closeables.close(out, false); } } - - private void addPartition(List partitions, String boundary) - { - partitions.add( - new SingleDimensionShardSpec( - config.getPartitionDimension(), - previousBoundary, - boundary, - partitions.size() - ) - ); - previousBoundary = boundary; - runningTotal = 0; - } } - public static class DeterminePartitionsOutputFormat extends FileOutputFormat + public static class DeterminePartitionsDimSelectionOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException @@ -444,17 +732,81 @@ public class DeterminePartitionsJob implements Jobby } } + private static class DimPartitions + { + public final String dim; + public final List partitions = Lists.newArrayList(); + + private DimPartitions(String dim) + { + this.dim = dim; + } + + public int getCardinality() + { + int sum = 0; + for(final DimPartition dimPartition : partitions) { + sum += dimPartition.cardinality; + } + return sum; + } + + public int getRows() + { + int sum = 0; + for(final DimPartition dimPartition : partitions) { + sum += dimPartition.rows; + } + return sum; + } + } + + private static class DimPartition + { + public ShardSpec shardSpec = null; + public int cardinality = 0; + public int rows = 0; + } + + private static class DimValueCount + { + public final String dim; + public final String value; + public final int numRows; + + private DimValueCount(String dim, String value, int numRows) + { + this.dim = dim; + this.value = value; + this.numRows = numRows; + } + + public Text toText() + { + return new Text(tabJoiner.join(dim, String.valueOf(numRows), value)); + } + + public static DimValueCount fromText(Text text) + { + final Iterator splits = tabSplitter.limit(3).split(text.toString()).iterator(); + final String dim = splits.next(); + final int numRows = Integer.parseInt(splits.next()); + final String value = splits.next(); + + return new DimValueCount(dim, value, numRows); + } + } + private static void write( TaskInputOutputContext context, final byte[] groupKey, - String value, - long numRows + DimValueCount dimValueCount ) throws IOException, InterruptedException { context.write( - new SortableBytes(groupKey, value.getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(), - new Text(tabJoiner.join(value, numRows)) + new SortableBytes(groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(), + dimValueCount.toText() ); } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 403484b9c61..3d682dadce0 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -34,15 +34,20 @@ import com.metamx.common.MapUtils; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import com.metamx.druid.RegisteringNode; +import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.serde.Registererer; import com.metamx.druid.indexer.data.DataSpec; +import com.metamx.druid.indexer.data.StringInputRowParser; +import com.metamx.druid.indexer.data.TimestampSpec; import com.metamx.druid.indexer.data.ToLowercaseDataSpec; import com.metamx.druid.indexer.granularity.GranularitySpec; import com.metamx.druid.indexer.granularity.UniformGranularitySpec; +import com.metamx.druid.indexer.partitions.PartitionsSpec; import com.metamx.druid.indexer.path.PathSpec; import com.metamx.druid.indexer.rollup.DataRollupSpec; import com.metamx.druid.indexer.updater.UpdaterJobSpec; +import com.metamx.druid.input.InputRow; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.utils.JodaUtils; @@ -50,6 +55,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -60,8 +66,6 @@ import org.joda.time.format.ISODateTimeFormat; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collections; @@ -162,8 +166,6 @@ public class HadoopDruidIndexerConfig private static final String CONFIG_PROPERTY = "druid.indexer.config"; - @Deprecated - private volatile List intervals; private volatile String dataSource; private volatile String timestampColumnName; private volatile String timestampFormat; @@ -175,8 +177,7 @@ public class HadoopDruidIndexerConfig private volatile String jobOutputDir; private volatile String segmentOutputDir; private volatile DateTime version = new DateTime(); - private volatile String partitionDimension; - private volatile Long targetPartitionSize; + private volatile PartitionsSpec partitionsSpec; private volatile boolean leaveIntermediate = false; private volatile boolean cleanupOnFailure = true; private volatile Map> shardSpecs = ImmutableMap.of(); @@ -186,22 +187,97 @@ public class HadoopDruidIndexerConfig private volatile boolean ignoreInvalidRows = false; private volatile List registererers = Lists.newArrayList(); + @JsonCreator + public HadoopDruidIndexerConfig( + final @JsonProperty("intervals") List intervals, + final @JsonProperty("dataSource") String dataSource, + final @JsonProperty("timestampColumnName") String timestampColumnName, + final @JsonProperty("timestampFormat") String timestampFormat, + final @JsonProperty("dataSpec") DataSpec dataSpec, + final @JsonProperty("segmentGranularity") Granularity segmentGranularity, + final @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + final @JsonProperty("pathSpec") PathSpec pathSpec, + final @JsonProperty("jobOutputDir") String jobOutputDir, + final @JsonProperty("segmentOutputDir") String segmentOutputDir, + final @JsonProperty("version") DateTime version, + final @JsonProperty("partitionDimension") String partitionDimension, + final @JsonProperty("targetPartitionSize") Long targetPartitionSize, + final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, + final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, + final @JsonProperty("cleanupOnFailure") boolean cleanupOnFailure, + final @JsonProperty("shardSpecs") Map> shardSpecs, + final @JsonProperty("overwriteFiles") boolean overwriteFiles, + final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, + final @JsonProperty("updaterJobSpec") UpdaterJobSpec updaterJobSpec, + final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, + final @JsonProperty("registererers") List registererers + ) + { + this.dataSource = dataSource; + this.timestampColumnName = timestampColumnName; + this.timestampFormat = timestampFormat; + this.dataSpec = dataSpec; + this.granularitySpec = granularitySpec; + this.pathSpec = pathSpec; + this.jobOutputDir = jobOutputDir; + this.segmentOutputDir = segmentOutputDir; + this.version = version; + this.partitionsSpec = partitionsSpec; + this.leaveIntermediate = leaveIntermediate; + this.cleanupOnFailure = cleanupOnFailure; + this.shardSpecs = shardSpecs; + this.overwriteFiles = overwriteFiles; + this.rollupSpec = rollupSpec; + this.updaterJobSpec = updaterJobSpec; + this.ignoreInvalidRows = ignoreInvalidRows; + this.registererers = registererers; + + if(partitionsSpec != null) { + Preconditions.checkArgument( + partitionDimension == null && targetPartitionSize == null, + "Cannot mix partitionsSpec with partitionDimension/targetPartitionSize" + ); + + this.partitionsSpec = partitionsSpec; + } else { + // Backwards compatibility + this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, false); + } + + if(granularitySpec != null) { + Preconditions.checkArgument( + segmentGranularity == null && intervals == null, + "Cannot mix granularitySpec with segmentGranularity/intervals" + ); + } else { + // Backwards compatibility + this.segmentGranularity = segmentGranularity; + if(segmentGranularity != null && intervals != null) { + this.granularitySpec = new UniformGranularitySpec(segmentGranularity, intervals); + } + } + } + + /** + * Default constructor does nothing. The caller is expected to use the various setX methods. + */ + public HadoopDruidIndexerConfig() + { + } + public List getIntervals() { return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals()); } @Deprecated - @JsonProperty public void setIntervals(List intervals) { - Preconditions.checkState(this.granularitySpec == null, "Use setGranularitySpec"); + Preconditions.checkState(this.granularitySpec == null, "Cannot mix setIntervals with granularitySpec"); + Preconditions.checkState(this.segmentGranularity != null, "Cannot use setIntervals without segmentGranularity"); // For backwards compatibility - this.intervals = intervals; - if (this.segmentGranularity != null) { - this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, this.intervals); - } + this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, intervals); } @JsonProperty @@ -237,6 +313,11 @@ public class HadoopDruidIndexerConfig this.timestampFormat = timestampFormat; } + public TimestampSpec getTimestampSpec() + { + return new TimestampSpec(timestampColumnName, timestampFormat); + } + @JsonProperty public DataSpec getDataSpec() { @@ -248,17 +329,30 @@ public class HadoopDruidIndexerConfig this.dataSpec = new ToLowercaseDataSpec(dataSpec); } - @Deprecated - @JsonProperty - public void setSegmentGranularity(Granularity segmentGranularity) + public StringInputRowParser getParser() { - Preconditions.checkState(this.granularitySpec == null, "Use setGranularitySpec"); + final List dimensionExclusions; - // For backwards compatibility - this.segmentGranularity = segmentGranularity; - if (this.intervals != null) { - this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, this.intervals); + if(getDataSpec().hasCustomDimensions()) { + dimensionExclusions = null; + } else { + dimensionExclusions = Lists.newArrayList(); + dimensionExclusions.add(getTimestampColumnName()); + dimensionExclusions.addAll( + Lists.transform( + getRollupSpec().getAggs(), new Function() + { + @Override + public String apply(AggregatorFactory aggregatorFactory) + { + return aggregatorFactory.getName(); + } + } + ) + ); } + + return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions); } @JsonProperty @@ -269,15 +363,20 @@ public class HadoopDruidIndexerConfig public void setGranularitySpec(GranularitySpec granularitySpec) { - Preconditions.checkState(this.intervals == null, "Use setGranularitySpec instead of setIntervals"); - Preconditions.checkState( - this.segmentGranularity == null, - "Use setGranularitySpec instead of setSegmentGranularity" - ); - this.granularitySpec = granularitySpec; } + @JsonProperty + public PartitionsSpec getPartitionsSpec() + { + return partitionsSpec; + } + + public void setPartitionsSpec(PartitionsSpec partitionsSpec) + { + this.partitionsSpec = partitionsSpec; + } + @JsonProperty public PathSpec getPathSpec() { @@ -322,31 +421,19 @@ public class HadoopDruidIndexerConfig this.version = version; } - @JsonProperty public String getPartitionDimension() { - return partitionDimension; - } - - public void setPartitionDimension(String partitionDimension) - { - this.partitionDimension = (partitionDimension == null) ? partitionDimension : partitionDimension; + return partitionsSpec.getPartitionDimension(); } public boolean partitionByDimension() { - return partitionDimension != null; + return partitionsSpec.isDeterminingPartitions(); } - @JsonProperty public Long getTargetPartitionSize() { - return targetPartitionSize; - } - - public void setTargetPartitionSize(Long targetPartitionSize) - { - this.targetPartitionSize = targetPartitionSize; + return partitionsSpec.getTargetPartitionSize(); } public boolean isUpdaterJobSpecSet() @@ -447,21 +534,15 @@ public class HadoopDruidIndexerConfig ********************************************/ /** - * Get the proper bucket for this "row" + * Get the proper bucket for some input row. * - * @param theMap a Map that represents a "row", keys are column names, values are, well, values + * @param inputRow an InputRow * * @return the Bucket that this row belongs to */ - public Optional getBucket(Map theMap) + public Optional getBucket(InputRow inputRow) { - final Optional timeBucket = getGranularitySpec().bucketInterval( - new DateTime( - theMap.get( - getTimestampColumnName() - ) - ) - ); + final Optional timeBucket = getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())); if (!timeBucket.isPresent()) { return Optional.absent(); } @@ -473,7 +554,7 @@ public class HadoopDruidIndexerConfig for (final HadoopyShardSpec hadoopyShardSpec : shards) { final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec(); - if (actualSpec.isInChunk(theMap)) { + if (actualSpec.isInChunk(inputRow)) { return Optional.of( new Bucket( hadoopyShardSpec.getShardNum(), @@ -484,7 +565,7 @@ public class HadoopDruidIndexerConfig } } - throw new ISE("row[%s] doesn't fit in any shard[%s]", theMap, shards); + throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards); } public Set getSegmentGranularIntervals() @@ -566,6 +647,11 @@ public class HadoopDruidIndexerConfig return new Path(makeIntermediatePath(), "segmentDescriptorInfo"); } + public Path makeGroupedDataDir() + { + return new Path(makeIntermediatePath(), "groupedData"); + } + public Path makeDescriptorInfoPath(DataSegment segment) { return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); @@ -626,10 +712,5 @@ public class HadoopDruidIndexerConfig final int nIntervals = getIntervals().size(); Preconditions.checkArgument(nIntervals > 0, "intervals.size()[%s] <= 0", nIntervals); - - if (partitionByDimension()) { - Preconditions.checkNotNull(partitionDimension); - Preconditions.checkNotNull(targetPartitionSize); - } } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMapper.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMapper.java new file mode 100644 index 00000000000..651cb757023 --- /dev/null +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMapper.java @@ -0,0 +1,66 @@ +package com.metamx.druid.indexer; + +import com.metamx.common.RE; +import com.metamx.druid.indexer.data.StringInputRowParser; +import com.metamx.druid.input.InputRow; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.joda.time.DateTime; + +import java.io.IOException; + +public abstract class HadoopDruidIndexerMapper extends Mapper +{ + private HadoopDruidIndexerConfig config; + private StringInputRowParser parser; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + parser = config.getParser(); + } + + public HadoopDruidIndexerConfig getConfig() + { + return config; + } + + public StringInputRowParser getParser() + { + return parser; + } + + @Override + protected void map( + LongWritable key, Text value, Context context + ) throws IOException, InterruptedException + { + try { + final InputRow inputRow; + try { + inputRow = parser.parse(value.toString()); + } + catch (IllegalArgumentException e) { + if (config.isIgnoreInvalidRows()) { + context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); + return; // we're ignoring this invalid row + } else { + throw e; + } + } + + if(config.getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())).isPresent()) { + innerMap(inputRow, value, context); + } + } + catch (RuntimeException e) { + throw new RE(e, "Failure on row[%s]", value); + } + } + + abstract protected void innerMap(InputRow inputRow, Text text, Context context) + throws IOException, InterruptedException; +} diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 28dacd1ca9a..a2ffe8566c8 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -19,31 +19,25 @@ package com.metamx.druid.indexer; -import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.google.common.primitives.Longs; import com.metamx.common.ISE; -import com.metamx.common.RE; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; -import com.metamx.common.parsers.Parser; -import com.metamx.common.parsers.ParserUtils; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.v1.IncrementalIndex; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexMerger; +import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.indexer.rollup.DataRollupSpec; -import com.metamx.druid.input.MapBasedInputRow; +import com.metamx.druid.input.InputRow; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -53,13 +47,11 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3native.NativeS3FileSystem; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -68,7 +60,6 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; @@ -78,7 +69,6 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -127,7 +117,7 @@ public class IndexGeneratorJob implements Jobby job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(Text.class); - SortableBytes.useSortableBytesAsKey(job); + SortableBytes.useSortableBytesAsMapOutputKey(job); job.setNumReduceTasks(Iterables.size(config.getAllBuckets())); job.setPartitionerClass(IndexGeneratorPartitioner.class); @@ -144,7 +134,7 @@ public class IndexGeneratorJob implements Jobby job.setJarByClass(IndexGeneratorJob.class); job.submit(); - log.info("Job submitted, status available at %s", job.getTrackingURL()); + log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL()); boolean success = job.waitForCompletion(true); @@ -159,75 +149,29 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorMapper extends Mapper + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { - private HadoopDruidIndexerConfig config; - private Parser parser; - private Function timestampConverter; - @Override - protected void setup(Context context) - throws IOException, InterruptedException - { - config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - parser = config.getDataSpec().getParser(); - timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat()); - } - - @Override - protected void map( - LongWritable key, Text value, Context context + protected void innerMap( + InputRow inputRow, + Text text, + Context context ) throws IOException, InterruptedException { + // Group by bucket, sort by timestamp + final Optional bucket = getConfig().getBucket(inputRow); - try { - final Map values = parser.parse(value.toString()); - - final String tsStr = (String) values.get(config.getTimestampColumnName()); - final DateTime timestamp; - try { - timestamp = timestampConverter.apply(tsStr); - } - catch (IllegalArgumentException e) { - if (config.isIgnoreInvalidRows()) { - context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); - return; // we're ignoring this invalid row - } else { - throw e; - } - } - - Optional bucket = config.getBucket( - Maps.transformEntries( - values, - new Maps.EntryTransformer() - { - @Override - public String transformEntry(@Nullable String key, @Nullable Object value) - { - if (key.equalsIgnoreCase(config.getTimestampColumnName())) { - return timestamp.toString(); - } - return value.toString(); - } - } - ) - ); - - if (bucket.isPresent()) { - // Group by bucket, sort by timestamp - context.write( - new SortableBytes( - bucket.get().toGroupKey(), - Longs.toByteArray(timestamp.getMillis()) - ).toBytesWritable(), - value - ); - } - } - catch (RuntimeException e) { - throw new RE(e, "Failure on row[%s]", value); + if(!bucket.isPresent()) { + throw new ISE("WTF?! No bucket found for row: %s", inputRow); } + + context.write( + new SortableBytes( + bucket.get().toGroupKey(), + Longs.toByteArray(inputRow.getTimestampFromEpoch()) + ).toBytesWritable(), + text + ); } } @@ -253,8 +197,7 @@ public class IndexGeneratorJob implements Jobby { private HadoopDruidIndexerConfig config; private List metricNames = Lists.newArrayList(); - private Function timestampConverter; - private Parser parser; + private StringInputRowParser parser; @Override protected void setup(Context context) @@ -265,8 +208,8 @@ public class IndexGeneratorJob implements Jobby for (AggregatorFactory factory : config.getRollupSpec().getAggs()) { metricNames.add(factory.getName().toLowerCase()); } - timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat()); - parser = config.getDataSpec().getParser(); + + parser = config.getParser(); } @Override @@ -299,32 +242,10 @@ public class IndexGeneratorJob implements Jobby for (final Text value : values) { context.progress(); - Map event = parser.parse(value.toString()); - final long timestamp = timestampConverter.apply((String) event.get(config.getTimestampColumnName())) - .getMillis(); - List dimensionNames = - config.getDataSpec().hasCustomDimensions() ? - config.getDataSpec().getDimensions() : - Lists.newArrayList( - FunctionalIterable.create(event.keySet()) - .filter( - new Predicate() - { - @Override - public boolean apply(@Nullable String input) - { - return !(metricNames.contains(input.toLowerCase()) - || config.getTimestampColumnName() - .equalsIgnoreCase(input)); - } - } - ) - ); - allDimensionNames.addAll(dimensionNames); + final InputRow inputRow = parser.parse(value.toString()); + allDimensionNames.addAll(inputRow.getDimensions()); - int numRows = index.add( - new MapBasedInputRow(timestamp, dimensionNames, event) - ); + int numRows = index.add(inputRow); ++lineCount; if (numRows >= rollupSpec.rowFlushBoundary) { diff --git a/indexer/src/main/java/com/metamx/druid/indexer/SortableBytes.java b/indexer/src/main/java/com/metamx/druid/indexer/SortableBytes.java index 3abaa7951b8..394f9dacffb 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/SortableBytes.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/SortableBytes.java @@ -102,7 +102,7 @@ public class SortableBytes ); } - public static void useSortableBytesAsKey(Job job) + public static void useSortableBytesAsMapOutputKey(Job job) { job.setMapOutputKeyClass(BytesWritable.class); job.setGroupingComparatorClass(SortableBytesGroupingComparator.class); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/granularity/UniformGranularitySpec.java b/indexer/src/main/java/com/metamx/druid/indexer/granularity/UniformGranularitySpec.java index a1039caba1c..51d2f37d437 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/granularity/UniformGranularitySpec.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/granularity/UniformGranularitySpec.java @@ -20,6 +20,9 @@ package com.metamx.druid.indexer.granularity; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.Granularity; import com.metamx.common.guava.Comparators; @@ -35,47 +38,47 @@ import java.util.TreeSet; public class UniformGranularitySpec implements GranularitySpec { final private Granularity granularity; - final private List intervals; + final private List inputIntervals; + final private ArbitraryGranularitySpec wrappedSpec; @JsonCreator public UniformGranularitySpec( @JsonProperty("gran") Granularity granularity, - @JsonProperty("intervals") List intervals + @JsonProperty("intervals") List inputIntervals ) { + List granularIntervals = Lists.newArrayList(); + + for (Interval inputInterval : inputIntervals) { + Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval)); + } + this.granularity = granularity; - this.intervals = intervals; + this.inputIntervals = ImmutableList.copyOf(inputIntervals); + this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals); } @Override public SortedSet bucketIntervals() { - final TreeSet retVal = Sets.newTreeSet(Comparators.intervals()); - - for (Interval interval : intervals) { - for (Interval segmentInterval : granularity.getIterable(interval)) { - retVal.add(segmentInterval); - } - } - - return retVal; + return wrappedSpec.bucketIntervals(); } @Override public Optional bucketInterval(DateTime dt) { - return Optional.of(granularity.bucket(dt)); + return wrappedSpec.bucketInterval(dt); } - @JsonProperty + @JsonProperty("gran") public Granularity getGranularity() { return granularity; } - @JsonProperty + @JsonProperty("intervals") public Iterable getIntervals() { - return intervals; + return inputIntervals; } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java b/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java new file mode 100644 index 00000000000..2d00cf71f06 --- /dev/null +++ b/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java @@ -0,0 +1,52 @@ +package com.metamx.druid.indexer.partitions; + +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; + +import javax.annotation.Nullable; + +public class PartitionsSpec +{ + @Nullable + private final String partitionDimension; + + private final long targetPartitionSize; + + private final boolean assumeGrouped; + + public PartitionsSpec( + @JsonProperty("partitionDimension") @Nullable String partitionDimension, + @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, + @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped + ) + { + this.partitionDimension = partitionDimension; + this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize; + this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; + } + + @JsonIgnore + public boolean isDeterminingPartitions() + { + return targetPartitionSize > 0; + } + + @JsonProperty + @Nullable + public String getPartitionDimension() + { + return partitionDimension; + } + + @JsonProperty + public long getTargetPartitionSize() + { + return targetPartitionSize; + } + + @JsonProperty + public boolean isAssumeGrouped() + { + return assumeGrouped; + } +} diff --git a/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java index 6bb56df31f5..f4db1148327 100644 --- a/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -22,6 +22,7 @@ package com.metamx.druid.indexer; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.metamx.druid.indexer.granularity.UniformGranularitySpec; +import com.metamx.druid.indexer.partitions.PartitionsSpec; import com.metamx.druid.jackson.DefaultObjectMapper; import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.Interval; @@ -67,7 +68,7 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testIntervalsAndSegmentGranularity() { + public void testGranularitySpecLegacy() { // Deprecated and replaced by granularitySpec, but still supported final HadoopDruidIndexerConfig cfg; @@ -98,9 +99,8 @@ public class HadoopDruidIndexerConfigTest ); } - @Test - public void testCmdlineAndSegmentGranularity() { + public void testGranularitySpecPostConstructorIntervals() { // Deprecated and replaced by granularitySpec, but still supported final HadoopDruidIndexerConfig cfg; @@ -133,7 +133,7 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testInvalidCombination() { + public void testInvalidGranularityCombination() { boolean thrown = false; try { final HadoopDruidIndexerConfig cfg = jsonMapper.readValue( @@ -154,4 +154,160 @@ public class HadoopDruidIndexerConfigTest Assert.assertTrue("Exception thrown", thrown); } + + @Test + public void testPartitionsSpecNoPartitioning() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonMapper.readValue( + "{}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec(); + + Assert.assertEquals( + "isDeterminingPartitions", + partitionsSpec.isDeterminingPartitions(), + false + ); + } + + @Test + public void testPartitionsSpecAutoDimension() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonMapper.readValue( + "{" + + "\"partitionsSpec\":{" + + " \"targetPartitionSize\":100" + + " }" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec(); + + Assert.assertEquals( + "isDeterminingPartitions", + partitionsSpec.isDeterminingPartitions(), + true + ); + + Assert.assertEquals( + "getTargetPartitionSize", + partitionsSpec.getTargetPartitionSize(), + 100 + ); + + Assert.assertEquals( + "getPartitionDimension", + partitionsSpec.getPartitionDimension(), + null + ); + } + + @Test + public void testPartitionsSpecSpecificDimension() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonMapper.readValue( + "{" + + "\"partitionsSpec\":{" + + " \"targetPartitionSize\":100," + + " \"partitionDimension\":\"foo\"" + + " }" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec(); + + Assert.assertEquals( + "isDeterminingPartitions", + partitionsSpec.isDeterminingPartitions(), + true + ); + + Assert.assertEquals( + "getTargetPartitionSize", + partitionsSpec.getTargetPartitionSize(), + 100 + ); + + Assert.assertEquals( + "getPartitionDimension", + partitionsSpec.getPartitionDimension(), + "foo" + ); + } + + @Test + public void testPartitionsSpecLegacy() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonMapper.readValue( + "{" + + "\"targetPartitionSize\":100," + + "\"partitionDimension\":\"foo\"" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec(); + + Assert.assertEquals( + "isDeterminingPartitions", + partitionsSpec.isDeterminingPartitions(), + true + ); + + Assert.assertEquals( + "getTargetPartitionSize", + partitionsSpec.getTargetPartitionSize(), + 100 + ); + + Assert.assertEquals( + "getPartitionDimension", + partitionsSpec.getPartitionDimension(), + "foo" + ); + } + + @Test + public void testInvalidPartitionsCombination() { + boolean thrown = false; + try { + final HadoopDruidIndexerConfig cfg = jsonMapper.readValue( + "{" + + "\"targetPartitionSize\":100," + + "\"partitionsSpec\":{" + + " \"targetPartitionSize\":100" + + " }" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + thrown = true; + } + + Assert.assertTrue("Exception thrown", thrown); + } } diff --git a/indexer/src/test/java/com/metamx/druid/indexer/granularity/ArbitraryGranularityTest.java b/indexer/src/test/java/com/metamx/druid/indexer/granularity/ArbitraryGranularityTest.java index efc8113ee1b..0044d7d13e6 100644 --- a/indexer/src/test/java/com/metamx/druid/indexer/granularity/ArbitraryGranularityTest.java +++ b/indexer/src/test/java/com/metamx/druid/indexer/granularity/ArbitraryGranularityTest.java @@ -69,6 +69,12 @@ public class ArbitraryGranularityTest spec.bucketInterval(new DateTime("2012-01-03T01Z")) ); + Assert.assertEquals( + "2012-01-04T01Z", + Optional.absent(), + spec.bucketInterval(new DateTime("2012-01-04T01Z")) + ); + Assert.assertEquals( "2012-01-07T23:59:59.999Z", Optional.of(new Interval("2012-01-07T00Z/2012-01-08T00Z")), diff --git a/indexer/src/test/java/com/metamx/druid/indexer/granularity/UniformGranularityTest.java b/indexer/src/test/java/com/metamx/druid/indexer/granularity/UniformGranularityTest.java index 1f37da56de0..ab21be5f9f5 100644 --- a/indexer/src/test/java/com/metamx/druid/indexer/granularity/UniformGranularityTest.java +++ b/indexer/src/test/java/com/metamx/druid/indexer/granularity/UniformGranularityTest.java @@ -72,6 +72,12 @@ public class UniformGranularityTest spec.bucketInterval(new DateTime("2012-01-03T01Z")) ); + Assert.assertEquals( + "2012-01-04T01Z", + Optional.absent(), + spec.bucketInterval(new DateTime("2012-01-04T01Z")) + ); + Assert.assertEquals( "2012-01-07T23:59:59.999Z", Optional.of(new Interval("2012-01-07T00Z/2012-01-08T00Z")),