move IndexTask to use hashed partition; fixes #815

This commit is contained in:
fjy 2014-11-13 16:07:11 -08:00 committed by Xavier Léauté
parent ef62bccdec
commit 580e1172c1
4 changed files with 217 additions and 120 deletions

View File

@ -29,23 +29,23 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
import com.google.common.primitives.Ints;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
import io.druid.segment.indexing.IngestionSpec;
@ -59,7 +59,6 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -67,7 +66,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArrayList;
@ -76,6 +74,26 @@ public class IndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(IndexTask.class);
private static HashFunction hashFunction = Hashing.murmur3_128();
/**
* Should we index this inputRow? Decision is based on our interval and shardSpec.
*
* @param inputRow the row to check
*
* @return true or false
*/
private static boolean shouldIndex(
final ShardSpec shardSpec,
final Interval interval,
final InputRow inputRow,
final QueryGranularity rollupGran
)
{
return interval.contains(inputRow.getTimestampFromEpoch())
&& shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow);
}
private static String makeId(String id, IndexIngestionSpec ingestionSchema)
{
if (id == null) {
@ -153,7 +171,7 @@ public class IndexTask extends AbstractFixedIntervalTask
for (final Interval bucket : validIntervals) {
final List<ShardSpec> shardSpecs;
if (targetPartitionSize > 0) {
shardSpecs = determinePartitions(bucket, targetPartitionSize);
shardSpecs = determinePartitions(bucket, targetPartitionSize, granularitySpec.getQueryGranularity());
} else {
int numShards = ingestionSchema.getTuningConfig().getNumShards();
if (numShards > 0) {
@ -200,7 +218,8 @@ public class IndexTask extends AbstractFixedIntervalTask
private List<ShardSpec> determinePartitions(
final Interval interval,
final int targetPartitionSize
final int targetPartitionSize,
final QueryGranularity queryGranularity
) throws IOException
{
log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize);
@ -208,113 +227,49 @@ public class IndexTask extends AbstractFixedIntervalTask
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
// The implementation of this determine partitions stuff is less than optimal. Should be done better.
// Blacklist dimensions that have multiple values per row
final Set<String> unusableDimensions = com.google.common.collect.Sets.newHashSet();
// Track values of all non-blacklisted dimensions
final Map<String, TreeMultiset<String>> dimensionValueMultisets = Maps.newHashMap();
// Use HLL to estimate number of rows
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
// Load data
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (interval.contains(inputRow.getTimestampFromEpoch())) {
// Extract dimensions from event
for (final String dim : inputRow.getDimensions()) {
final List<String> dimValues = inputRow.getDimension(dim);
if (!unusableDimensions.contains(dim)) {
if (dimValues.size() == 1) {
// Track this value
TreeMultiset<String> dimensionValueMultiset = dimensionValueMultisets.get(dim);
if (dimensionValueMultiset == null) {
dimensionValueMultiset = TreeMultiset.create();
dimensionValueMultisets.put(dim, dimensionValueMultiset);
}
dimensionValueMultiset.add(dimValues.get(0));
} else {
// Only single-valued dimensions can be used for partitions
unusableDimensions.add(dim);
dimensionValueMultisets.remove(dim);
}
}
}
final List<Object> groupKey = Rows.toGroupKey(
queryGranularity.truncate(inputRow.getTimestampFromEpoch()),
inputRow
);
collector.add(
hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
.asBytes()
);
}
}
}
final double numRows = collector.estimateCardinality();
log.info("Estimated approximately [%,f] rows of data.", numRows);
int numberOfShards = (int) Math.ceil(numRows / targetPartitionSize);
if ((double) numberOfShards > numRows) {
numberOfShards = (int) numRows;
}
log.info("Will require [%,d] shard(s).", numberOfShards);
// ShardSpecs we will return
final List<ShardSpec> shardSpecs = Lists.newArrayList();
// Select highest-cardinality dimension
Ordering<Map.Entry<String, TreeMultiset<String>>> byCardinalityOrdering = new Ordering<Map.Entry<String, TreeMultiset<String>>>()
{
@Override
public int compare(
Map.Entry<String, TreeMultiset<String>> left,
Map.Entry<String, TreeMultiset<String>> right
)
{
return Ints.compare(left.getValue().elementSet().size(), right.getValue().elementSet().size());
}
};
if (dimensionValueMultisets.isEmpty()) {
// No suitable partition dimension. We'll make one big segment and hope for the best.
log.info("No suitable partition dimension found");
if (numberOfShards == 1) {
shardSpecs.add(new NoneShardSpec());
} else {
// Find best partition dimension (heuristic: highest cardinality).
final Map.Entry<String, TreeMultiset<String>> partitionEntry =
byCardinalityOrdering.max(dimensionValueMultisets.entrySet());
final String partitionDim = partitionEntry.getKey();
final TreeMultiset<String> partitionDimValues = partitionEntry.getValue();
log.info(
"Partitioning on dimension[%s] with cardinality[%d] over rows[%d]",
partitionDim,
partitionDimValues.elementSet().size(),
partitionDimValues.size()
);
// Iterate over unique partition dimension values in sorted order
String currentPartitionStart = null;
int currentPartitionSize = 0;
for (final String partitionDimValue : partitionDimValues.elementSet()) {
currentPartitionSize += partitionDimValues.count(partitionDimValue);
if (currentPartitionSize >= targetPartitionSize) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
partitionDimValue,
shardSpecs.size()
);
log.info("Adding shard: %s", shardSpec);
shardSpecs.add(shardSpec);
currentPartitionSize = partitionDimValues.count(partitionDimValue);
currentPartitionStart = partitionDimValue;
}
}
if (currentPartitionSize > 0) {
// One last shard to go
final ShardSpec shardSpec;
if (shardSpecs.isEmpty()) {
shardSpec = new NoneShardSpec();
} else {
shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
null,
shardSpecs.size()
);
}
log.info("Adding shard: %s", shardSpec);
shardSpecs.add(shardSpec);
for (int i = 0; i < numberOfShards; ++i) {
shardSpecs.add(
new HashBasedNumberedShardSpec(
i,
numberOfShards,
HadoopDruidIndexerConfig.jsonMapper
)
);
}
}
@ -437,24 +392,6 @@ public class IndexTask extends AbstractFixedIntervalTask
return Iterables.getOnlyElement(pushedSegments);
}
/**
* Should we index this inputRow? Decision is based on our interval and shardSpec.
*
* @param inputRow the row to check
*
* @return true or false
*/
private static boolean shouldIndex(
final ShardSpec shardSpec,
final Interval interval,
final InputRow inputRow,
final QueryGranularity rollupGran
)
{
return interval.contains(inputRow.getTimestampFromEpoch())
&& shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow);
}
public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
{
private final DataSchema dataSchema;

View File

@ -0,0 +1,159 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.google.api.client.util.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.List;
public class IndexTaskTest
{
@Test
public void testDeterminePartitions() throws Exception
{
File tmpFile = File.createTempFile("druid", "index");
tmpFile.deleteOnExit();
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2014-01-01T00:00:10Z,a,1");
writer.println("2014-01-01T01:00:20Z,b,1");
writer.println("2014-01-01T02:00:30Z,c,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto"
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.MINUTE,
Arrays.asList(new Interval("2014/2015"))
)
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpFile.getParentFile(),
"druid*",
null
)
),
new IndexTask.IndexTuningConfig(
2,
0,
null
)
),
new DefaultObjectMapper()
);
final List<DataSegment> segments = Lists.newArrayList();
indexTask.run(
new TaskToolbox(
null, null, new TaskActionClientFactory()
{
@Override
public TaskActionClient create(Task task)
{
return new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
return (RetType) Arrays.asList(
new TaskLock(
"", "", null, new DateTime().toString()
)
);
}
return null;
}
};
}
}, null, new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String dataSource)
{
return null;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
segments.add(segment);
return segment;
}
}, null, null, null, null, null, null, null, null, null, null, null
)
);
Assert.assertEquals(2, segments.size());
}
}

View File

@ -54,6 +54,7 @@ public class LocalFirehoseFactory implements FirehoseFactory<StringInputRowParse
public LocalFirehoseFactory(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
// Backwards compatible
@JsonProperty("parser") StringInputRowParser parser
)
{