From 36fc85736c3f4233dee9f28894b95e82467c34c6 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 8 Jul 2014 18:01:31 +0530 Subject: [PATCH] Add ShardSpec Lookup Optimize choosing shardSpec for Hash Partitions --- .../indexer/HadoopDruidIndexerConfig.java | 56 +++++++++++++------ .../partition/HashBasedNumberedShardSpec.java | 20 +++++++ .../timeline/partition/LinearShardSpec.java | 16 ++++++ .../timeline/partition/NumberedShardSpec.java | 16 ++++++ .../partition/SingleDimensionShardSpec.java | 19 +++++++ 5 files changed, 110 insertions(+), 17 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 8f0f2d43c34..11acfd8df6e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Maps; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -30,6 +31,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; @@ -51,6 +53,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.ShardSpec; +import io.druid.timeline.partition.ShardSpecLookup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -60,6 +63,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.ISODateTimeFormat; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; @@ -169,6 +173,8 @@ public class HadoopDruidIndexerConfig private volatile HadoopIngestionSpec schema; private volatile PathSpec pathSpec; private volatile ColumnConfig columnConfig; + private volatile Map shardSpecLookups = Maps.newHashMap(); + private volatile Map hadoopShardSpecLookup = Maps.newHashMap(); @JsonCreator public HadoopDruidIndexerConfig( @@ -178,6 +184,30 @@ public class HadoopDruidIndexerConfig this.columnConfig = columnConfig; this.schema = schema; this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class); + for (Map.Entry> entry : schema.getTuningConfig().getShardSpecs().entrySet()) { + if (entry.getValue() == null || entry.getValue().isEmpty()) { + continue; + } + final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec(); + shardSpecLookups.put( + entry.getKey(), actualSpec.getLookup( + Lists.transform( + entry.getValue(), new Function() + { + @Nullable + @Override + public ShardSpec apply(@Nullable HadoopyShardSpec input) + { + return input.getActualSpec(); + } + } + ) + ) + ); + for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) { + hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec); + } + } } @JsonProperty @@ -306,25 +336,17 @@ public class HadoopDruidIndexerConfig return Optional.absent(); } - final List shards = schema.getTuningConfig().getShardSpecs().get(timeBucket.get().getStart()); - if (shards == null || shards.isEmpty()) { - return Optional.absent(); - } + final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(inputRow); + final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec); - for (final HadoopyShardSpec hadoopyShardSpec : shards) { - final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec(); - if (actualSpec.isInChunk(inputRow)) { - return Optional.of( - new Bucket( - hadoopyShardSpec.getShardNum(), - timeBucket.get().getStart(), - actualSpec.getPartitionNum() - ) - ); - } - } + return Optional.of( + new Bucket( + hadoopyShardSpec.getShardNum(), + timeBucket.get().getStart(), + actualSpec.getPartitionNum() + ) + ); - throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards); } public Optional> getSegmentGranularIntervals() diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index 5110f886601..ac8b570578b 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -25,12 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; import java.util.List; +import java.util.Map; public class HashBasedNumberedShardSpec extends NumberedShardSpec { @@ -74,4 +76,22 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec '}'; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + final ImmutableMap.Builder shardSpecsMapBuilder = ImmutableMap.builder(); + for (ShardSpec spec : shardSpecs) { + shardSpecsMapBuilder.put(spec.getPartitionNum(), spec); + } + final Map shardSpecMap = shardSpecsMapBuilder.build(); + + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecMap.get((long) hash(row) % getPartitions()); + } + }; + } } \ No newline at end of file diff --git a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java index ea7d3256229..6f9dd6258e0 100644 --- a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java @@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.data.input.InputRow; +import java.util.List; +import java.util.Set; + public class LinearShardSpec implements ShardSpec { private int partitionNum; @@ -42,6 +45,19 @@ public class LinearShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecs.get(0); + } + }; + } + @Override public PartitionChunk createChunk(T obj) { return new LinearPartitionChunk(partitionNum, obj); diff --git a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java index 73a3437a80a..683836d95fb 100644 --- a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java @@ -25,6 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.data.input.InputRow; +import java.util.List; +import java.util.Set; + public class NumberedShardSpec implements ShardSpec { @JsonIgnore @@ -52,6 +55,19 @@ public class NumberedShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecs.get(0); + } + }; + } + @JsonProperty("partitions") public int getPartitions() { diff --git a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java index 3cb3e5a72ba..197d5e6129f 100644 --- a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java @@ -20,6 +20,7 @@ package io.druid.timeline.partition; import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.ISE; import io.druid.data.input.InputRow; import java.util.List; @@ -94,6 +95,24 @@ public class SingleDimensionShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + for (ShardSpec spec : shardSpecs) { + if (spec.isInChunk(row)) { + return spec; + } + } + throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); + } + }; + } + public void setPartitionNum(int partitionNum) { this.partitionNum = partitionNum;