diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java index a40642ebec5..e257d4ed6b7 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java @@ -23,21 +23,22 @@ import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.PeekingIterator; import com.google.common.io.Closeables; -import com.metamx.common.IAE; -import com.metamx.common.Pair; +import com.metamx.common.ISE; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.logger.Logger; -import com.metamx.common.parsers.Parser; -import com.metamx.common.parsers.ParserUtils; import com.metamx.druid.CombiningIterable; +import com.metamx.druid.input.InputRow; import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.shard.SingleDimensionShardSpec; @@ -45,7 +46,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InvalidJobConfException; @@ -56,8 +57,11 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.codehaus.jackson.type.TypeReference; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; @@ -65,20 +69,26 @@ import org.joda.time.Interval; import java.io.IOException; import java.io.OutputStream; -import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** + * Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so, + * choosing the highest cardinality dimension that satisfies the criteria: + * + * */ public class DeterminePartitionsJob implements Jobby { private static final Logger log = new Logger(DeterminePartitionsJob.class); - private static final Joiner keyJoiner = Joiner.on(","); - private static final Splitter keySplitter = Splitter.on(","); private static final Joiner tabJoiner = HadoopDruidIndexerConfig.tabJoiner; private static final Splitter tabSplitter = HadoopDruidIndexerConfig.tabSplitter; @@ -91,96 +101,171 @@ public class DeterminePartitionsJob implements Jobby this.config = config; } - public boolean run() + public static void injectSystemProperties(Job job) { - try { - Job job = new Job( - new Configuration(), - String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals()) - ); - - job.getConfiguration().set("io.sort.record.percent", "0.19"); - for (String propName : System.getProperties().stringPropertyNames()) { - Configuration conf = job.getConfiguration(); - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } + for (String propName : System.getProperties().stringPropertyNames()) { + Configuration conf = job.getConfiguration(); + if (propName.startsWith("hadoop.")) { + conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); } - - job.setInputFormatClass(TextInputFormat.class); - - job.setMapperClass(DeterminePartitionsMapper.class); - job.setMapOutputValueClass(Text.class); - - SortableBytes.useSortableBytesAsKey(job); - - job.setCombinerClass(DeterminePartitionsCombiner.class); - job.setReducerClass(DeterminePartitionsReducer.class); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(Text.class); - job.setOutputFormatClass(DeterminePartitionsJob.DeterminePartitionsOutputFormat.class); - FileOutputFormat.setOutputPath(job, config.makeIntermediatePath()); - - config.addInputPaths(job); - config.intoConfiguration(job); - - job.setJarByClass(DeterminePartitionsJob.class); - - job.submit(); - log.info("Job submitted, status available at %s", job.getTrackingURL()); - - final boolean retVal = job.waitForCompletion(true); - - if (retVal) { - log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals()); - FileSystem fileSystem = null; - Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); - int shardCount = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { - DateTime bucket = segmentGranularity.getStart(); - - final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)); - if (fileSystem == null) { - fileSystem = partitionInfoPath.getFileSystem(job.getConfiguration()); - } - if (fileSystem.exists(partitionInfoPath)) { - List specs = config.jsonMapper.readValue( - Utils.openInputStream(job, partitionInfoPath), new TypeReference>() - { - } - ); - - List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); - for (int i = 0; i < specs.size(); ++i) { - actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); - } - - shardSpecs.put(bucket, actualSpecs); - } - else { - log.info("Path[%s] didn't exist!?", partitionInfoPath); - } - } - config.setShardSpecs(shardSpecs); - } - else { - log.info("Job completed unsuccessfully."); - } - - return retVal; - } - catch (Exception e) { - throw new RuntimeException(e); } } - public static class DeterminePartitionsMapper extends Mapper + public boolean run() + { + try { + /* + * Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear + * in the final segment. + */ + final Job groupByJob = new Job( + new Configuration(), + String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals()) + ); + + injectSystemProperties(groupByJob); + groupByJob.setInputFormatClass(TextInputFormat.class); + groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); + groupByJob.setMapOutputKeyClass(Text.class); + groupByJob.setMapOutputValueClass(NullWritable.class); + groupByJob.setCombinerClass(DeterminePartitionsGroupByReducer.class); + groupByJob.setReducerClass(DeterminePartitionsGroupByReducer.class); + groupByJob.setOutputKeyClass(Text.class); + groupByJob.setOutputValueClass(NullWritable.class); + groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); + groupByJob.setJarByClass(DeterminePartitionsJob.class); + + config.addInputPaths(groupByJob); + config.intoConfiguration(groupByJob); + FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir()); + + groupByJob.submit(); + log.info("Job submitted, status available at: %s", groupByJob.getTrackingURL()); + + if(!groupByJob.waitForCompletion(true)) { + log.error("Job failed: %s", groupByJob.getJobID().toString()); + return false; + } + + /* + * Read grouped data from previous job and determine appropriate partitions. + */ + final Job dimSelectionJob = new Job( + new Configuration(), + String.format("%s-determine_partitions_dimselection-%s", config.getDataSource(), config.getIntervals()) + ); + + dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19"); + + injectSystemProperties(dimSelectionJob); + dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class); + dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionMapper.class); + SortableBytes.useSortableBytesAsMapOutputKey(dimSelectionJob); + dimSelectionJob.setMapOutputValueClass(Text.class); + dimSelectionJob.setCombinerClass(DeterminePartitionsDimSelectionCombiner.class); + dimSelectionJob.setReducerClass(DeterminePartitionsDimSelectionReducer.class); + dimSelectionJob.setOutputKeyClass(BytesWritable.class); + dimSelectionJob.setOutputValueClass(Text.class); + dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); + dimSelectionJob.setJarByClass(DeterminePartitionsJob.class); + + config.intoConfiguration(dimSelectionJob); + FileInputFormat.addInputPath(dimSelectionJob, config.makeGroupedDataDir()); + FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath()); + + dimSelectionJob.submit(); + log.info("Job submitted, status available at: %s", dimSelectionJob.getTrackingURL()); + + if(!dimSelectionJob.waitForCompletion(true)) { + log.error("Job failed: %s", dimSelectionJob.getJobID().toString()); + return false; + } + + /* + * Load partitions determined by the previous job. + */ + + log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals()); + FileSystem fileSystem = null; + Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); + int shardCount = 0; + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { + DateTime bucket = segmentGranularity.getStart(); + + final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)); + if (fileSystem == null) { + fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); + } + if (fileSystem.exists(partitionInfoPath)) { + List specs = config.jsonMapper.readValue( + Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() + { + } + ); + + List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); + for (int i = 0; i < specs.size(); ++i) { + actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); + } + + shardSpecs.put(bucket, actualSpecs); + } + else { + log.info("Path[%s] didn't exist!?", partitionInfoPath); + } + } + config.setShardSpecs(shardSpecs); + + return true; + } catch(Exception e) { + throw Throwables.propagate(e); + } + } + + public static class DeterminePartitionsGroupByMapper extends HadoopDruidIndexerMapper + { + @Override + protected void innerMap( + InputRow inputRow, + Text text, + Context context + ) throws IOException, InterruptedException + { + // Create group key + // TODO -- There are more efficient ways to do this + final Map> dims = Maps.newTreeMap(); + for(final String dim : inputRow.getDimensions()) { + final Set dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim)); + if(dimValues.size() > 0) { + dims.put(dim, dimValues); + } + } + final List groupKey = ImmutableList.of( + getConfig().getRollupSpec().getRollupGranularity().truncate(inputRow.getTimestampFromEpoch()), + dims + ); + context.write(new Text(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)), NullWritable.get()); + } + } + + public static class DeterminePartitionsGroupByReducer extends Reducer + { + @Override + protected void reduce( + Text key, + Iterable values, + Context context + ) throws IOException, InterruptedException + { + context.write(key, NullWritable.get()); + } + } + + public static class DeterminePartitionsDimSelectionMapper extends Mapper { private HadoopDruidIndexerConfig config; private String partitionDimension; - private Parser parser; - private Function timestampConverter; @Override protected void setup(Context context) @@ -188,49 +273,49 @@ public class DeterminePartitionsJob implements Jobby { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); partitionDimension = config.getPartitionDimension(); - parser = config.getDataSpec().getParser(); - timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat()); } @Override protected void map( - LongWritable key, Text value, Context context + Text key, NullWritable value, Context context ) throws IOException, InterruptedException { - Map values = parser.parse(value.toString()); - final DateTime timestamp; - final String tsStr = (String) values.get(config.getTimestampColumnName()); - try { - timestamp = timestampConverter.apply(tsStr); - } - catch(IllegalArgumentException e) { - if(config.isIgnoreInvalidRows()) { - context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); - return; // we're ignoring this invalid row - } - else { - throw e; - } - } + final List timeAndDims = + HadoopDruidIndexerConfig.jsonMapper.readValue( + key.getBytes(), new TypeReference>() {} + ); + final DateTime timestamp = new DateTime(timeAndDims.get(0)); + final Map dims = (Map) timeAndDims.get(1); final Optional maybeInterval = config.getGranularitySpec().bucketInterval(timestamp); - if(maybeInterval.isPresent()) { - final DateTime bucket = maybeInterval.get().getStart(); - final String outKey = keyJoiner.join(bucket.toString(), partitionDimension); - final Object dimValue = values.get(partitionDimension); - if (! (dimValue instanceof String)) { - throw new IAE("Cannot partition on a tag-style dimension[%s], line was[%s]", partitionDimension, value); + if(!maybeInterval.isPresent()) { + throw new ISE("WTF?! No bucket found for timestamp: %s", timestamp); + } + + final Interval interval = maybeInterval.get(); + final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8); + + for(final Map.Entry dimAndValues : dims.entrySet()) { + final String dim = dimAndValues.getKey(); + + if(partitionDimension == null || partitionDimension.equals(dim)) { + final List dimValues = (List) dimAndValues.getValue(); + + if(dimValues.size() == 1) { + // Emit this value. + write(context, groupKey, new DimValueCount(dim, dimValues.get(0), 1)); + } else { + // This dimension is unsuitable for partitioning. Poison it by emitting a negative value. + write(context, groupKey, new DimValueCount(dim, "", -1)); + } } - - final byte[] groupKey = outKey.getBytes(Charsets.UTF_8); - write(context, groupKey, "", 1); - write(context, groupKey, (String) dimValue, 1); } } } - private static abstract class DeterminePartitionsBaseReducer extends Reducer + private static abstract class DeterminePartitionsDimSelectionBaseReducer + extends Reducer { protected static volatile HadoopDruidIndexerConfig config = null; @@ -240,7 +325,7 @@ public class DeterminePartitionsJob implements Jobby throws IOException, InterruptedException { if (config == null) { - synchronized (DeterminePartitionsBaseReducer.class) { + synchronized (DeterminePartitionsDimSelectionBaseReducer.class) { if (config == null) { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); } @@ -255,166 +340,277 @@ public class DeterminePartitionsJob implements Jobby { SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); - final Iterable> combinedIterable = combineRows(values); + final Iterable combinedIterable = combineRows(values); innerReduce(context, keyBytes, combinedIterable); } protected abstract void innerReduce( - Context context, SortableBytes keyBytes, Iterable> combinedIterable + Context context, SortableBytes keyBytes, Iterable combinedIterable ) throws IOException, InterruptedException; - private Iterable> combineRows(Iterable input) + private Iterable combineRows(Iterable input) { - return new CombiningIterable>( + return new CombiningIterable( Iterables.transform( input, - new Function>() + new Function() { @Override - public Pair apply(Text input) + public DimValueCount apply(Text input) { - Iterator splits = tabSplitter.split(input.toString()).iterator(); - return new Pair(splits.next(), Long.parseLong(splits.next())); + return DimValueCount.fromText(input); } } ), - new Comparator>() + new Comparator() { @Override - public int compare(Pair o1, Pair o2) + public int compare(DimValueCount o1, DimValueCount o2) { - return o1.lhs.compareTo(o2.lhs); + return ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result(); } }, - new BinaryFn, Pair, Pair>() + new BinaryFn() { @Override - public Pair apply(Pair arg1, Pair arg2) + public DimValueCount apply(DimValueCount arg1, DimValueCount arg2) { if (arg2 == null) { return arg1; } - return new Pair(arg1.lhs, arg1.rhs + arg2.rhs); + // Respect "poisoning" (negative values mean we can't use this dimension) + final int newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1); + return new DimValueCount(arg1.dim, arg1.value, newNumRows); } } ); } } - public static class DeterminePartitionsCombiner extends DeterminePartitionsBaseReducer + public static class DeterminePartitionsDimSelectionCombiner extends DeterminePartitionsDimSelectionBaseReducer { @Override protected void innerReduce( - Context context, SortableBytes keyBytes, Iterable> combinedIterable + Context context, SortableBytes keyBytes, Iterable combinedIterable ) throws IOException, InterruptedException { - for (Pair pair : combinedIterable) { - write(context, keyBytes.getGroupKey(), pair.lhs, pair.rhs); + for (DimValueCount dvc : combinedIterable) { + write(context, keyBytes.getGroupKey(), dvc); } } } - public static class DeterminePartitionsReducer extends DeterminePartitionsBaseReducer + public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer { - String previousBoundary; - long runningTotal; + private static final double SHARD_COMBINE_THRESHOLD = 0.25; + private static final double SHARD_OVERSIZE_THRESHOLD = 1.5; @Override protected void innerReduce( - Context context, SortableBytes keyBytes, Iterable> combinedIterable + Context context, SortableBytes keyBytes, Iterable combinedIterable ) throws IOException, InterruptedException { - PeekingIterator> iterator = Iterators.peekingIterator(combinedIterable.iterator()); - Pair totalPair = iterator.next(); + PeekingIterator iterator = Iterators.peekingIterator(combinedIterable.iterator()); - Preconditions.checkState(totalPair.lhs.equals(""), "Total pair value was[%s]!?", totalPair.lhs); - long totalRows = totalPair.rhs; + // "iterator" will take us over many candidate dimensions + DimPartitions currentDimPartitions = null; + DimPartition currentDimPartition = null; + String currentDimPartitionStart = null; + boolean currentDimSkip = false; - long numPartitions = Math.max(totalRows / config.getTargetPartitionSize(), 1); - long expectedRowsPerPartition = totalRows / numPartitions; + // We'll store possible partitions in here + final Map dimPartitionss = Maps.newHashMap(); - class PartitionsList extends ArrayList - { - } - List partitions = new PartitionsList(); + while(iterator.hasNext()) { + final DimValueCount dvc = iterator.next(); - runningTotal = 0; - Pair prev = null; - previousBoundary = null; - while (iterator.hasNext()) { - Pair curr = iterator.next(); - - if (runningTotal > expectedRowsPerPartition) { - Preconditions.checkNotNull( - prev, "Prev[null] while runningTotal[%s] was > expectedRows[%s]!?", runningTotal, expectedRowsPerPartition - ); - - addPartition(partitions, curr.lhs); + if(currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) { + // Starting a new dimension! Exciting! + currentDimPartitions = new DimPartitions(dvc.dim); + currentDimPartition = new DimPartition(); + currentDimPartitionStart = null; + currentDimSkip = false; } - runningTotal += curr.rhs; - prev = curr; + // Respect poisoning + if(!currentDimSkip && dvc.numRows < 0) { + log.info("Cannot partition on multi-valued dimension: %s", dvc.dim); + currentDimSkip = true; + } + + if(currentDimSkip) { + continue; + } + + // See if we need to cut a new partition ending immediately before this dimension value + if(currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) { + final ShardSpec shardSpec = new SingleDimensionShardSpec( + currentDimPartitions.dim, + currentDimPartitionStart, + dvc.value, + currentDimPartitions.partitions.size() + ); + + log.info( + "Adding possible shard with %,d rows and %,d unique values: %s", + currentDimPartition.rows, + currentDimPartition.cardinality, + shardSpec + ); + + currentDimPartition.shardSpec = shardSpec; + currentDimPartitions.partitions.add(currentDimPartition); + currentDimPartition = new DimPartition(); + currentDimPartitionStart = dvc.value; + } + + // Update counters + currentDimPartition.cardinality ++; + currentDimPartition.rows += dvc.numRows; + + if(!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) { + // Finalize the current dimension + + if(currentDimPartition.rows > 0) { + // One more shard to go + final ShardSpec shardSpec; + + if (currentDimPartitions.partitions.isEmpty()) { + shardSpec = new NoneShardSpec(); + } else { + if(currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) { + // Combine with previous shard + final DimPartition previousDimPartition = currentDimPartitions.partitions.remove( + currentDimPartitions.partitions.size() - 1 + ); + + final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec; + + shardSpec = new SingleDimensionShardSpec( + currentDimPartitions.dim, + previousShardSpec.getStart(), + null, + previousShardSpec.getPartitionNum() + ); + + log.info("Removing possible shard: %s", previousShardSpec); + + currentDimPartition.rows += previousDimPartition.rows; + currentDimPartition.cardinality += previousDimPartition.cardinality; + } else { + // Create new shard + shardSpec = new SingleDimensionShardSpec( + currentDimPartitions.dim, + currentDimPartitionStart, + null, + currentDimPartitions.partitions.size() + ); + } + } + + log.info( + "Adding possible shard with %,d rows and %,d unique values: %s", + currentDimPartition.rows, + currentDimPartition.cardinality, + shardSpec + ); + + currentDimPartition.shardSpec = shardSpec; + currentDimPartitions.partitions.add(currentDimPartition); + } + + log.info( + "Completed dimension[%s]: %,d possible shards with %,d unique values", + currentDimPartitions.dim, + currentDimPartitions.partitions.size(), + currentDimPartitions.getCardinality() + ); + + // Add ourselves to the partitions map + dimPartitionss.put(currentDimPartitions.dim, currentDimPartitions); + } } - if (partitions.isEmpty()) { - partitions.add(new NoneShardSpec()); - } else if (((double) runningTotal / (double) expectedRowsPerPartition) < 0.25) { - final SingleDimensionShardSpec lastSpec = (SingleDimensionShardSpec) partitions.remove(partitions.size() - 1); - partitions.add( - new SingleDimensionShardSpec( - config.getPartitionDimension(), - lastSpec.getStart(), - null, - lastSpec.getPartitionNum() - ) - ); - } else { - partitions.add( - new SingleDimensionShardSpec( - config.getPartitionDimension(), - previousBoundary, - null, - partitions.size() - ) - ); + // Choose best dimension + if(dimPartitionss.isEmpty()) { + throw new ISE("No suitable partitioning dimension found!"); } - DateTime bucket = new DateTime( - Iterables.get(keySplitter.split(new String(keyBytes.getGroupKey(), Charsets.UTF_8)), 0) - ); - OutputStream out = Utils.makePathAndOutputStream( + final int totalRows = dimPartitionss.values().iterator().next().getRows(); + + int maxCardinality = -1; + DimPartitions maxCardinalityPartitions = null; + + for(final DimPartitions dimPartitions : dimPartitionss.values()) { + if(dimPartitions.getRows() != totalRows) { + throw new ISE( + "WTF?! Dimension[%s] row count %,d != expected row count %,d", + dimPartitions.dim, + dimPartitions.getRows(), + totalRows + ); + } + + // Make sure none of these shards are oversized + boolean oversized = false; + for(final DimPartition partition : dimPartitions.partitions) { + if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) { + log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); + oversized = true; + } + } + + if(oversized) { + continue; + } + + if(dimPartitions.getCardinality() > maxCardinality) { + maxCardinality = dimPartitions.getCardinality(); + maxCardinalityPartitions = dimPartitions; + } + } + + if(maxCardinalityPartitions == null) { + throw new ISE("No suitable partitioning dimension found!"); + } + + final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8)); + final OutputStream out = Utils.makePathAndOutputStream( context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles() ); - for (ShardSpec partition : partitions) { - log.info("%s", partition); + final List chosenShardSpecs = Lists.transform( + maxCardinalityPartitions.partitions, new Function() + { + @Override + public ShardSpec apply(DimPartition dimPartition) + { + return dimPartition.shardSpec; + } + } + ); + + log.info("Chosen partitions:"); + for (ShardSpec shardSpec : chosenShardSpecs) { + log.info(" %s", shardSpec); } try { - config.jsonMapper.writeValue(out, partitions); + // For some reason this used to work without writerWithType, but now it appears to forget to write "type" + // info for the ShardSpecs (so they cannot be deserialized). + HadoopDruidIndexerConfig.jsonMapper.writerWithType(new TypeReference>() {}).writeValue( + out, + chosenShardSpecs + ); } finally { Closeables.close(out, false); } } - - private void addPartition(List partitions, String boundary) - { - partitions.add( - new SingleDimensionShardSpec( - config.getPartitionDimension(), - previousBoundary, - boundary, - partitions.size() - ) - ); - previousBoundary = boundary; - runningTotal = 0; - } } - public static class DeterminePartitionsOutputFormat extends FileOutputFormat + public static class DeterminePartitionsDimSelectionOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException @@ -444,17 +640,81 @@ public class DeterminePartitionsJob implements Jobby } } + private static class DimPartitions + { + public final String dim; + public final List partitions = Lists.newArrayList(); + + private DimPartitions(String dim) + { + this.dim = dim; + } + + public int getCardinality() + { + int sum = 0; + for(final DimPartition dimPartition : partitions) { + sum += dimPartition.cardinality; + } + return sum; + } + + public int getRows() + { + int sum = 0; + for(final DimPartition dimPartition : partitions) { + sum += dimPartition.rows; + } + return sum; + } + } + + private static class DimPartition + { + public ShardSpec shardSpec = null; + public int cardinality = 0; + public int rows = 0; + } + + private static class DimValueCount + { + public final String dim; + public final String value; + public final int numRows; + + private DimValueCount(String dim, String value, int numRows) + { + this.dim = dim; + this.value = value; + this.numRows = numRows; + } + + public Text toText() + { + return new Text(tabJoiner.join(dim, String.valueOf(numRows), value)); + } + + public static DimValueCount fromText(Text text) + { + final Iterator splits = tabSplitter.limit(3).split(text.toString()).iterator(); + final String dim = splits.next(); + final int numRows = Integer.parseInt(splits.next()); + final String value = splits.next(); + + return new DimValueCount(dim, value, numRows); + } + } + private static void write( TaskInputOutputContext context, final byte[] groupKey, - String value, - long numRows + DimValueCount dimValueCount ) throws IOException, InterruptedException { context.write( - new SortableBytes(groupKey, value.getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(), - new Text(tabJoiner.join(value, numRows)) + new SortableBytes(groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(), + dimValueCount.toText() ); } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 403484b9c61..d5034ef9966 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -34,15 +34,19 @@ import com.metamx.common.MapUtils; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import com.metamx.druid.RegisteringNode; +import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.serde.Registererer; import com.metamx.druid.indexer.data.DataSpec; +import com.metamx.druid.indexer.data.StringInputRowParser; +import com.metamx.druid.indexer.data.TimestampSpec; import com.metamx.druid.indexer.data.ToLowercaseDataSpec; import com.metamx.druid.indexer.granularity.GranularitySpec; import com.metamx.druid.indexer.granularity.UniformGranularitySpec; import com.metamx.druid.indexer.path.PathSpec; import com.metamx.druid.indexer.rollup.DataRollupSpec; import com.metamx.druid.indexer.updater.UpdaterJobSpec; +import com.metamx.druid.input.InputRow; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.utils.JodaUtils; @@ -60,8 +64,6 @@ import org.joda.time.format.ISODateTimeFormat; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collections; @@ -237,6 +239,11 @@ public class HadoopDruidIndexerConfig this.timestampFormat = timestampFormat; } + public TimestampSpec getTimestampSpec() + { + return new TimestampSpec(timestampColumnName, timestampFormat); + } + @JsonProperty public DataSpec getDataSpec() { @@ -248,6 +255,32 @@ public class HadoopDruidIndexerConfig this.dataSpec = new ToLowercaseDataSpec(dataSpec); } + public StringInputRowParser getParser() + { + final List dimensionExclusions; + + if(getDataSpec().hasCustomDimensions()) { + dimensionExclusions = null; + } else { + dimensionExclusions = Lists.newArrayList(); + dimensionExclusions.add(getTimestampColumnName()); + dimensionExclusions.addAll( + Lists.transform( + getRollupSpec().getAggs(), new Function() + { + @Override + public String apply(AggregatorFactory aggregatorFactory) + { + return aggregatorFactory.getName(); + } + } + ) + ); + } + + return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions); + } + @Deprecated @JsonProperty public void setSegmentGranularity(Granularity segmentGranularity) @@ -335,7 +368,7 @@ public class HadoopDruidIndexerConfig public boolean partitionByDimension() { - return partitionDimension != null; + return targetPartitionSize != null; } @JsonProperty @@ -447,21 +480,15 @@ public class HadoopDruidIndexerConfig ********************************************/ /** - * Get the proper bucket for this "row" + * Get the proper bucket for some input row. * - * @param theMap a Map that represents a "row", keys are column names, values are, well, values + * @param inputRow an InputRow * * @return the Bucket that this row belongs to */ - public Optional getBucket(Map theMap) + public Optional getBucket(InputRow inputRow) { - final Optional timeBucket = getGranularitySpec().bucketInterval( - new DateTime( - theMap.get( - getTimestampColumnName() - ) - ) - ); + final Optional timeBucket = getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())); if (!timeBucket.isPresent()) { return Optional.absent(); } @@ -473,7 +500,7 @@ public class HadoopDruidIndexerConfig for (final HadoopyShardSpec hadoopyShardSpec : shards) { final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec(); - if (actualSpec.isInChunk(theMap)) { + if (actualSpec.isInChunk(inputRow)) { return Optional.of( new Bucket( hadoopyShardSpec.getShardNum(), @@ -484,7 +511,7 @@ public class HadoopDruidIndexerConfig } } - throw new ISE("row[%s] doesn't fit in any shard[%s]", theMap, shards); + throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards); } public Set getSegmentGranularIntervals() @@ -566,6 +593,11 @@ public class HadoopDruidIndexerConfig return new Path(makeIntermediatePath(), "segmentDescriptorInfo"); } + public Path makeGroupedDataDir() + { + return new Path(makeIntermediatePath(), "groupedData"); + } + public Path makeDescriptorInfoPath(DataSegment segment) { return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); @@ -626,10 +658,5 @@ public class HadoopDruidIndexerConfig final int nIntervals = getIntervals().size(); Preconditions.checkArgument(nIntervals > 0, "intervals.size()[%s] <= 0", nIntervals); - - if (partitionByDimension()) { - Preconditions.checkNotNull(partitionDimension); - Preconditions.checkNotNull(targetPartitionSize); - } } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMapper.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMapper.java new file mode 100644 index 00000000000..f49762cc4c6 --- /dev/null +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMapper.java @@ -0,0 +1,69 @@ +package com.metamx.druid.indexer; + +import com.metamx.common.RE; +import com.metamx.druid.indexer.data.StringInputRowParser; +import com.metamx.druid.input.InputRow; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.joda.time.DateTime; + +import java.io.IOException; + +public class HadoopDruidIndexerMapper extends Mapper +{ + private HadoopDruidIndexerConfig config; + private StringInputRowParser parser; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + parser = config.getParser(); + } + + public HadoopDruidIndexerConfig getConfig() + { + return config; + } + + public StringInputRowParser getParser() + { + return parser; + } + + @Override + protected void map( + LongWritable key, Text value, Context context + ) throws IOException, InterruptedException + { + try { + final InputRow inputRow; + try { + inputRow = parser.parse(value.toString()); + } + catch (IllegalArgumentException e) { + if (config.isIgnoreInvalidRows()) { + context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); + return; // we're ignoring this invalid row + } else { + throw e; + } + } + + if(config.getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())).isPresent()) { + innerMap(inputRow, value, context); + } + } + catch (RuntimeException e) { + throw new RE(e, "Failure on row[%s]", value); + } + } + + protected void innerMap(InputRow inputRow, Text text, Context context) + throws IOException, InterruptedException + { + // no-op, meant to be overridden + } +} diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 34d743fc9be..721158efb91 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -19,31 +19,25 @@ package com.metamx.druid.indexer; -import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.google.common.primitives.Longs; import com.metamx.common.ISE; -import com.metamx.common.RE; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; -import com.metamx.common.parsers.Parser; -import com.metamx.common.parsers.ParserUtils; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IncrementalIndex; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.index.v1.MMappedIndex; +import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.indexer.rollup.DataRollupSpec; -import com.metamx.druid.input.MapBasedInputRow; +import com.metamx.druid.input.InputRow; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -53,13 +47,11 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3native.NativeS3FileSystem; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -68,7 +60,6 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; @@ -78,7 +69,6 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -127,7 +117,7 @@ public class IndexGeneratorJob implements Jobby job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(Text.class); - SortableBytes.useSortableBytesAsKey(job); + SortableBytes.useSortableBytesAsMapOutputKey(job); job.setNumReduceTasks(Iterables.size(config.getAllBuckets())); job.setPartitionerClass(IndexGeneratorPartitioner.class); @@ -159,75 +149,29 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorMapper extends Mapper + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { - private HadoopDruidIndexerConfig config; - private Parser parser; - private Function timestampConverter; - @Override - protected void setup(Context context) - throws IOException, InterruptedException - { - config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - parser = config.getDataSpec().getParser(); - timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat()); - } - - @Override - protected void map( - LongWritable key, Text value, Context context + protected void innerMap( + InputRow inputRow, + Text text, + Context context ) throws IOException, InterruptedException { + // Group by bucket, sort by timestamp + final Optional bucket = getConfig().getBucket(inputRow); - try { - final Map values = parser.parse(value.toString()); - - final String tsStr = (String) values.get(config.getTimestampColumnName()); - final DateTime timestamp; - try { - timestamp = timestampConverter.apply(tsStr); - } - catch (IllegalArgumentException e) { - if (config.isIgnoreInvalidRows()) { - context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); - return; // we're ignoring this invalid row - } else { - throw e; - } - } - - Optional bucket = config.getBucket( - Maps.transformEntries( - values, - new Maps.EntryTransformer() - { - @Override - public String transformEntry(@Nullable String key, @Nullable Object value) - { - if (key.equalsIgnoreCase(config.getTimestampColumnName())) { - return timestamp.toString(); - } - return value.toString(); - } - } - ) - ); - - if (bucket.isPresent()) { - // Group by bucket, sort by timestamp - context.write( - new SortableBytes( - bucket.get().toGroupKey(), - Longs.toByteArray(timestamp.getMillis()) - ).toBytesWritable(), - value - ); - } - } - catch (RuntimeException e) { - throw new RE(e, "Failure on row[%s]", value); + if(!bucket.isPresent()) { + throw new ISE("WTF?! No bucket found for row: %s", inputRow); } + + context.write( + new SortableBytes( + bucket.get().toGroupKey(), + Longs.toByteArray(inputRow.getTimestampFromEpoch()) + ).toBytesWritable(), + text + ); } } @@ -253,8 +197,7 @@ public class IndexGeneratorJob implements Jobby { private HadoopDruidIndexerConfig config; private List metricNames = Lists.newArrayList(); - private Function timestampConverter; - private Parser parser; + private StringInputRowParser parser; @Override protected void setup(Context context) @@ -265,8 +208,8 @@ public class IndexGeneratorJob implements Jobby for (AggregatorFactory factory : config.getRollupSpec().getAggs()) { metricNames.add(factory.getName().toLowerCase()); } - timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat()); - parser = config.getDataSpec().getParser(); + + parser = config.getParser(); } @Override @@ -299,32 +242,10 @@ public class IndexGeneratorJob implements Jobby for (final Text value : values) { context.progress(); - Map event = parser.parse(value.toString()); - final long timestamp = timestampConverter.apply((String) event.get(config.getTimestampColumnName())) - .getMillis(); - List dimensionNames = - config.getDataSpec().hasCustomDimensions() ? - config.getDataSpec().getDimensions() : - Lists.newArrayList( - FunctionalIterable.create(event.keySet()) - .filter( - new Predicate() - { - @Override - public boolean apply(@Nullable String input) - { - return !(metricNames.contains(input.toLowerCase()) - || config.getTimestampColumnName() - .equalsIgnoreCase(input)); - } - } - ) - ); - allDimensionNames.addAll(dimensionNames); + final InputRow inputRow = parser.parse(value.toString()); + allDimensionNames.addAll(inputRow.getDimensions()); - int numRows = index.add( - new MapBasedInputRow(timestamp, dimensionNames, event) - ); + int numRows = index.add(inputRow); ++lineCount; if (numRows >= rollupSpec.rowFlushBoundary) { diff --git a/indexer/src/main/java/com/metamx/druid/indexer/SortableBytes.java b/indexer/src/main/java/com/metamx/druid/indexer/SortableBytes.java index 3abaa7951b8..394f9dacffb 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/SortableBytes.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/SortableBytes.java @@ -102,7 +102,7 @@ public class SortableBytes ); } - public static void useSortableBytesAsKey(Job job) + public static void useSortableBytesAsMapOutputKey(Job job) { job.setMapOutputKeyClass(BytesWritable.class); job.setGroupingComparatorClass(SortableBytesGroupingComparator.class);