diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 492317faf41..00da8cbc1cb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -29,23 +29,23 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; -import com.google.common.collect.TreeMultiset; -import com.google.common.primitives.Ints; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; +import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; +import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.index.YeOldePlumberSchool; -import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IOConfig; import io.druid.segment.indexing.IngestionSpec; @@ -59,7 +59,6 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.ShardSpec; -import io.druid.timeline.partition.SingleDimensionShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -67,7 +66,6 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.concurrent.CopyOnWriteArrayList; @@ -76,6 +74,26 @@ public class IndexTask extends AbstractFixedIntervalTask { private static final Logger log = new Logger(IndexTask.class); + private static HashFunction hashFunction = Hashing.murmur3_128(); + + /** + * Should we index this inputRow? Decision is based on our interval and shardSpec. + * + * @param inputRow the row to check + * + * @return true or false + */ + private static boolean shouldIndex( + final ShardSpec shardSpec, + final Interval interval, + final InputRow inputRow, + final QueryGranularity rollupGran + ) + { + return interval.contains(inputRow.getTimestampFromEpoch()) + && shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); + } + private static String makeId(String id, IndexIngestionSpec ingestionSchema) { if (id == null) { @@ -153,7 +171,7 @@ public class IndexTask extends AbstractFixedIntervalTask for (final Interval bucket : validIntervals) { final List shardSpecs; if (targetPartitionSize > 0) { - shardSpecs = determinePartitions(bucket, targetPartitionSize); + shardSpecs = determinePartitions(bucket, targetPartitionSize, granularitySpec.getQueryGranularity()); } else { int numShards = ingestionSchema.getTuningConfig().getNumShards(); if (numShards > 0) { @@ -200,7 +218,8 @@ public class IndexTask extends AbstractFixedIntervalTask private List determinePartitions( final Interval interval, - final int targetPartitionSize + final int targetPartitionSize, + final QueryGranularity queryGranularity ) throws IOException { log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize); @@ -208,113 +227,49 @@ public class IndexTask extends AbstractFixedIntervalTask final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); // The implementation of this determine partitions stuff is less than optimal. Should be done better. - - // Blacklist dimensions that have multiple values per row - final Set unusableDimensions = com.google.common.collect.Sets.newHashSet(); - // Track values of all non-blacklisted dimensions - final Map> dimensionValueMultisets = Maps.newHashMap(); + // Use HLL to estimate number of rows + HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); // Load data try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); if (interval.contains(inputRow.getTimestampFromEpoch())) { - // Extract dimensions from event - for (final String dim : inputRow.getDimensions()) { - final List dimValues = inputRow.getDimension(dim); - if (!unusableDimensions.contains(dim)) { - if (dimValues.size() == 1) { - // Track this value - TreeMultiset dimensionValueMultiset = dimensionValueMultisets.get(dim); - if (dimensionValueMultiset == null) { - dimensionValueMultiset = TreeMultiset.create(); - dimensionValueMultisets.put(dim, dimensionValueMultiset); - } - dimensionValueMultiset.add(dimValues.get(0)); - } else { - // Only single-valued dimensions can be used for partitions - unusableDimensions.add(dim); - dimensionValueMultisets.remove(dim); - } - } - } + final List groupKey = Rows.toGroupKey( + queryGranularity.truncate(inputRow.getTimestampFromEpoch()), + inputRow + ); + collector.add( + hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)) + .asBytes() + ); } } } + final double numRows = collector.estimateCardinality(); + log.info("Estimated approximately [%,f] rows of data.", numRows); + + int numberOfShards = (int) Math.ceil(numRows / targetPartitionSize); + if ((double) numberOfShards > numRows) { + numberOfShards = (int) numRows; + } + log.info("Will require [%,d] shard(s).", numberOfShards); + // ShardSpecs we will return final List shardSpecs = Lists.newArrayList(); - // Select highest-cardinality dimension - Ordering>> byCardinalityOrdering = new Ordering>>() - { - @Override - public int compare( - Map.Entry> left, - Map.Entry> right - ) - { - return Ints.compare(left.getValue().elementSet().size(), right.getValue().elementSet().size()); - } - }; - - if (dimensionValueMultisets.isEmpty()) { - // No suitable partition dimension. We'll make one big segment and hope for the best. - log.info("No suitable partition dimension found"); + if (numberOfShards == 1) { shardSpecs.add(new NoneShardSpec()); } else { - // Find best partition dimension (heuristic: highest cardinality). - final Map.Entry> partitionEntry = - byCardinalityOrdering.max(dimensionValueMultisets.entrySet()); - - final String partitionDim = partitionEntry.getKey(); - final TreeMultiset partitionDimValues = partitionEntry.getValue(); - - log.info( - "Partitioning on dimension[%s] with cardinality[%d] over rows[%d]", - partitionDim, - partitionDimValues.elementSet().size(), - partitionDimValues.size() - ); - - // Iterate over unique partition dimension values in sorted order - String currentPartitionStart = null; - int currentPartitionSize = 0; - for (final String partitionDimValue : partitionDimValues.elementSet()) { - currentPartitionSize += partitionDimValues.count(partitionDimValue); - if (currentPartitionSize >= targetPartitionSize) { - final ShardSpec shardSpec = new SingleDimensionShardSpec( - partitionDim, - currentPartitionStart, - partitionDimValue, - shardSpecs.size() - ); - - log.info("Adding shard: %s", shardSpec); - shardSpecs.add(shardSpec); - - currentPartitionSize = partitionDimValues.count(partitionDimValue); - currentPartitionStart = partitionDimValue; - } - } - - if (currentPartitionSize > 0) { - // One last shard to go - final ShardSpec shardSpec; - - if (shardSpecs.isEmpty()) { - shardSpec = new NoneShardSpec(); - } else { - shardSpec = new SingleDimensionShardSpec( - partitionDim, - currentPartitionStart, - null, - shardSpecs.size() - ); - } - - log.info("Adding shard: %s", shardSpec); - shardSpecs.add(shardSpec); + for (int i = 0; i < numberOfShards; ++i) { + shardSpecs.add( + new HashBasedNumberedShardSpec( + i, + numberOfShards, + HadoopDruidIndexerConfig.jsonMapper + ) + ); } } @@ -437,24 +392,6 @@ public class IndexTask extends AbstractFixedIntervalTask return Iterables.getOnlyElement(pushedSegments); } - /** - * Should we index this inputRow? Decision is based on our interval and shardSpec. - * - * @param inputRow the row to check - * - * @return true or false - */ - private static boolean shouldIndex( - final ShardSpec shardSpec, - final Interval interval, - final InputRow inputRow, - final QueryGranularity rollupGran - ) - { - return interval.contains(inputRow.getTimestampFromEpoch()) - && shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); - } - public static class IndexIngestionSpec extends IngestionSpec { private final DataSchema dataSchema; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java new file mode 100644 index 00000000000..06b161951da --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -0,0 +1,159 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.task; + +import com.google.api.client.util.Lists; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.SpatialDimensionSchema; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.List; + +public class IndexTaskTest +{ + @Test + public void testDeterminePartitions() throws Exception + { + File tmpFile = File.createTempFile("druid", "index"); + tmpFile.deleteOnExit(); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("2014-01-01T00:00:10Z,a,1"); + writer.println("2014-01-01T01:00:20Z,b,1"); + writer.println("2014-01-01T02:00:30Z,c,1"); + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + new IndexTask.IndexIngestionSpec( + new DataSchema( + "test", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "ts", + "auto" + ), + new DimensionsSpec( + Arrays.asList("ts"), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + new UniformGranularitySpec( + Granularity.DAY, + QueryGranularity.MINUTE, + Arrays.asList(new Interval("2014/2015")) + ) + ), + new IndexTask.IndexIOConfig( + new LocalFirehoseFactory( + tmpFile.getParentFile(), + "druid*", + null + ) + ), + new IndexTask.IndexTuningConfig( + 2, + 0, + null + ) + ), + new DefaultObjectMapper() + ); + + final List segments = Lists.newArrayList(); + + indexTask.run( + new TaskToolbox( + null, null, new TaskActionClientFactory() + { + @Override + public TaskActionClient create(Task task) + { + return new TaskActionClient() + { + @Override + public RetType submit(TaskAction taskAction) throws IOException + { + if (taskAction instanceof LockListAction) { + return (RetType) Arrays.asList( + new TaskLock( + "", "", null, new DateTime().toString() + ) + ); + } + return null; + } + }; + } + }, null, new DataSegmentPusher() + { + @Override + public String getPathForHadoop(String dataSource) + { + return null; + } + + @Override + public DataSegment push(File file, DataSegment segment) throws IOException + { + segments.add(segment); + return segment; + } + }, null, null, null, null, null, null, null, null, null, null, null + ) + ); + + Assert.assertEquals(2, segments.size()); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index dca9f440bb3..d2dcf606ee1 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -54,6 +54,7 @@ public class LocalFirehoseFactory implements FirehoseFactory