From 36fc85736c3f4233dee9f28894b95e82467c34c6 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 8 Jul 2014 18:01:31 +0530 Subject: [PATCH 1/4] Add ShardSpec Lookup Optimize choosing shardSpec for Hash Partitions --- .../indexer/HadoopDruidIndexerConfig.java | 56 +++++++++++++------ .../partition/HashBasedNumberedShardSpec.java | 20 +++++++ .../timeline/partition/LinearShardSpec.java | 16 ++++++ .../timeline/partition/NumberedShardSpec.java | 16 ++++++ .../partition/SingleDimensionShardSpec.java | 19 +++++++ 5 files changed, 110 insertions(+), 17 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 8f0f2d43c34..11acfd8df6e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Maps; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -30,6 +31,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; @@ -51,6 +53,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.ShardSpec; +import io.druid.timeline.partition.ShardSpecLookup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -60,6 +63,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.ISODateTimeFormat; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; @@ -169,6 +173,8 @@ public class HadoopDruidIndexerConfig private volatile HadoopIngestionSpec schema; private volatile PathSpec pathSpec; private volatile ColumnConfig columnConfig; + private volatile Map shardSpecLookups = Maps.newHashMap(); + private volatile Map hadoopShardSpecLookup = Maps.newHashMap(); @JsonCreator public HadoopDruidIndexerConfig( @@ -178,6 +184,30 @@ public class HadoopDruidIndexerConfig this.columnConfig = columnConfig; this.schema = schema; this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class); + for (Map.Entry> entry : schema.getTuningConfig().getShardSpecs().entrySet()) { + if (entry.getValue() == null || entry.getValue().isEmpty()) { + continue; + } + final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec(); + shardSpecLookups.put( + entry.getKey(), actualSpec.getLookup( + Lists.transform( + entry.getValue(), new Function() + { + @Nullable + @Override + public ShardSpec apply(@Nullable HadoopyShardSpec input) + { + return input.getActualSpec(); + } + } + ) + ) + ); + for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) { + hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec); + } + } } @JsonProperty @@ -306,25 +336,17 @@ public class HadoopDruidIndexerConfig return Optional.absent(); } - final List shards = schema.getTuningConfig().getShardSpecs().get(timeBucket.get().getStart()); - if (shards == null || shards.isEmpty()) { - return Optional.absent(); - } + final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(inputRow); + final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec); - for (final HadoopyShardSpec hadoopyShardSpec : shards) { - final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec(); - if (actualSpec.isInChunk(inputRow)) { - return Optional.of( - new Bucket( - hadoopyShardSpec.getShardNum(), - timeBucket.get().getStart(), - actualSpec.getPartitionNum() - ) - ); - } - } + return Optional.of( + new Bucket( + hadoopyShardSpec.getShardNum(), + timeBucket.get().getStart(), + actualSpec.getPartitionNum() + ) + ); - throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards); } public Optional> getSegmentGranularIntervals() diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index 5110f886601..ac8b570578b 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -25,12 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; import java.util.List; +import java.util.Map; public class HashBasedNumberedShardSpec extends NumberedShardSpec { @@ -74,4 +76,22 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec '}'; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + final ImmutableMap.Builder shardSpecsMapBuilder = ImmutableMap.builder(); + for (ShardSpec spec : shardSpecs) { + shardSpecsMapBuilder.put(spec.getPartitionNum(), spec); + } + final Map shardSpecMap = shardSpecsMapBuilder.build(); + + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecMap.get((long) hash(row) % getPartitions()); + } + }; + } } \ No newline at end of file diff --git a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java index ea7d3256229..6f9dd6258e0 100644 --- a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java @@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.data.input.InputRow; +import java.util.List; +import java.util.Set; + public class LinearShardSpec implements ShardSpec { private int partitionNum; @@ -42,6 +45,19 @@ public class LinearShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecs.get(0); + } + }; + } + @Override public PartitionChunk createChunk(T obj) { return new LinearPartitionChunk(partitionNum, obj); diff --git a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java index 73a3437a80a..683836d95fb 100644 --- a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java @@ -25,6 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.data.input.InputRow; +import java.util.List; +import java.util.Set; + public class NumberedShardSpec implements ShardSpec { @JsonIgnore @@ -52,6 +55,19 @@ public class NumberedShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + return shardSpecs.get(0); + } + }; + } + @JsonProperty("partitions") public int getPartitions() { diff --git a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java index 3cb3e5a72ba..197d5e6129f 100644 --- a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java @@ -20,6 +20,7 @@ package io.druid.timeline.partition; import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.ISE; import io.druid.data.input.InputRow; import java.util.List; @@ -94,6 +95,24 @@ public class SingleDimensionShardSpec implements ShardSpec return partitionNum; } + @Override + public ShardSpecLookup getLookup(final List shardSpecs) + { + return new ShardSpecLookup() + { + @Override + public ShardSpec getShardSpec(InputRow row) + { + for (ShardSpec spec : shardSpecs) { + if (spec.isInChunk(row)) { + return spec; + } + } + throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); + } + }; + } + public void setPartitionNum(int partitionNum) { this.partitionNum = partitionNum; From fa43049240e69b9a4dfb344b97f762d57fabd3fe Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 10 Jul 2014 11:48:46 +0530 Subject: [PATCH 2/4] review comments & pom changes --- .../indexer/HadoopDruidIndexerConfig.java | 7 ++----- pom.xml | 2 +- .../partition/HashBasedNumberedShardSpec.java | 21 ++++++++++++------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 11acfd8df6e..1ff6e5a4482 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -36,17 +36,16 @@ import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; -import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import io.druid.common.utils.JodaUtils; import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; +import io.druid.guice.GuiceInjectors; import io.druid.guice.JsonConfigProvider; import io.druid.guice.annotations.Self; import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.path.PathSpec; -import io.druid.guice.GuiceInjectors; import io.druid.initialization.Initialization; import io.druid.segment.column.ColumnConfig; import io.druid.segment.indexing.granularity.GranularitySpec; @@ -63,7 +62,6 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.ISODateTimeFormat; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; @@ -194,9 +192,8 @@ public class HadoopDruidIndexerConfig Lists.transform( entry.getValue(), new Function() { - @Nullable @Override - public ShardSpec apply(@Nullable HadoopyShardSpec input) + public ShardSpec apply(HadoopyShardSpec input) { return input.getActualSpec(); } diff --git a/pom.xml b/pom.xml index 2b7f88276e8..7d270176674 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.26.5 2.5.0 - 0.2.4 + 0.2.5 diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index ac8b570578b..afcb2feb922 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -25,14 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; +import java.util.Collections; +import java.util.Comparator; import java.util.List; -import java.util.Map; public class HashBasedNumberedShardSpec extends NumberedShardSpec { @@ -79,18 +79,25 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec @Override public ShardSpecLookup getLookup(final List shardSpecs) { - final ImmutableMap.Builder shardSpecsMapBuilder = ImmutableMap.builder(); - for (ShardSpec spec : shardSpecs) { - shardSpecsMapBuilder.put(spec.getPartitionNum(), spec); + // Sort on basis of partitionNumber + Collections.sort( + shardSpecs, new Comparator() + { + @Override + public int compare(ShardSpec o1, ShardSpec o2) + { + return Integer.compare(o1.getPartitionNum(), o2.getPartitionNum()); + } } - final Map shardSpecMap = shardSpecsMapBuilder.build(); + ); return new ShardSpecLookup() { @Override public ShardSpec getShardSpec(InputRow row) { - return shardSpecMap.get((long) hash(row) % getPartitions()); + int index = (int) ((long) hash(row)) % getPartitions(); + return shardSpecs.get(index); } }; } From a12688bc8bab700899af0a525a76c22d79aceab8 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 10 Jul 2014 12:21:27 +0530 Subject: [PATCH 3/4] fix partitionNum calculation & remove extra sorting --- pom.xml | 2 +- .../partition/HashBasedNumberedShardSpec.java | 16 +--------------- .../shard/HashBasedNumberedShardSpecTest.java | 12 ++++++++++++ 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 7d270176674..dfa203ec905 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.26.5 2.5.0 - 0.2.5 + 0.2.5-SNAPSHOT diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index afcb2feb922..8f347ee6cfd 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -30,8 +30,6 @@ import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; -import java.util.Collections; -import java.util.Comparator; import java.util.List; public class HashBasedNumberedShardSpec extends NumberedShardSpec @@ -79,24 +77,12 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec @Override public ShardSpecLookup getLookup(final List shardSpecs) { - // Sort on basis of partitionNumber - Collections.sort( - shardSpecs, new Comparator() - { - @Override - public int compare(ShardSpec o1, ShardSpec o2) - { - return Integer.compare(o1.getPartitionNum(), o2.getPartitionNum()); - } - } - ); - return new ShardSpecLookup() { @Override public ShardSpec getShardSpec(InputRow row) { - int index = (int) ((long) hash(row)) % getPartitions(); + int index = Math.abs(hash(row) % getPartitions()); return shardSpecs.get(index); } }; diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java index bfd573dae89..5f176015c2a 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java @@ -194,4 +194,16 @@ public class HashBasedNumberedShardSpecTest return 0; } } + + @Test + public void testValidity(){ + for(int i=Integer.MIN_VALUE;i<=Integer.MAX_VALUE;i++){ + { + int partitionNum = Math.abs((int) ((long) i % 2)); + if(partitionNum != 0 && partitionNum != 1){ + throw new ISE("for i "+ i+ "partitionNum "+ partitionNum); + } + } + } + } } From a31376ee8371a62a092662c04ddde3a360d0d009 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 10 Jul 2014 12:23:20 +0530 Subject: [PATCH 4/4] point to correct version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index dfa203ec905..7d270176674 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.26.5 2.5.0 - 0.2.5-SNAPSHOT + 0.2.5