From 0d8c1ffe54c16648af34054b34d66eaa7d74f5d8 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 23 Apr 2014 03:30:30 +0530 Subject: [PATCH] review comments and add partitioner --- .../java/io/druid/indexer/DbUpdaterJob.java | 3 +- .../indexer/DetermineHashedPartitionsJob.java | 63 ++++++++++++++----- .../HadoopDruidDetermineConfigurationJob.java | 2 +- .../partitions/AbstractPartitionsSpec.java | 17 ++--- .../partitions/HashedPartitionsSpec.java | 4 +- .../indexer/partitions/PartitionsSpec.java | 2 +- .../partitions/RandomPartitionsSpec.java | 4 +- .../indexer/HadoopDruidIndexerConfigTest.java | 4 +- .../segment/realtime/DbSegmentPublisher.java | 3 +- 9 files changed, 65 insertions(+), 37 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java index 36b67e10c05..347fa85fe2b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; import io.druid.db.DbConnector; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; @@ -75,7 +76,7 @@ public class DbUpdaterJob implements Jobby .put("created_date", new DateTime().toString()) .put("start", segment.getInterval().getStart().toString()) .put("end", segment.getInterval().getEnd().toString()) - .put("partitioned", segment.getShardSpec().getPartitionNum()) + .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1) .put("version", segment.getVersion()) .put("used", true) .put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment)) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index e3782916902..86534bb027e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -37,6 +37,7 @@ import io.druid.indexer.granularity.UniformGranularitySpec; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +46,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -65,7 +67,6 @@ import java.util.Set; */ public class DetermineHashedPartitionsJob implements Jobby { - private static final int MAX_SHARDS = 128; private static final Logger log = new Logger(DetermineHashedPartitionsJob.class); private final HadoopDruidIndexerConfig config; @@ -98,6 +99,7 @@ public class DetermineHashedPartitionsJob implements Jobby groupByJob.setOutputKeyClass(NullWritable.class); groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); + groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class); if (!config.getSegmentGranularIntervals().isPresent()) { groupByJob.setNumReduceTasks(1); } else { @@ -150,14 +152,6 @@ public class DetermineHashedPartitionsJob implements Jobby ); int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); - if (numberOfShards > MAX_SHARDS) { - throw new ISE( - "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high", - numberOfShards, - MAX_SHARDS - ); - } - List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); if (numberOfShards == 1) { actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); @@ -314,13 +308,6 @@ public class DetermineHashedPartitionsJob implements Jobby } } - private byte[] getDataBytes(BytesWritable writable) - { - byte[] rv = new byte[writable.getLength()]; - System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength()); - return rv; - } - @Override public void run(Context context) throws IOException, InterruptedException @@ -348,6 +335,50 @@ public class DetermineHashedPartitionsJob implements Jobby } } } + + public static class DetermineHashedPartitionsPartitioner + extends Partitioner implements Configurable + { + private Configuration config; + private boolean determineIntervals; + private Map reducerLookup; + + @Override + public int getPartition(LongWritable interval, BytesWritable text, int numPartitions) + { + + if (config.get("mapred.job.tracker").equals("local") || determineIntervals) { + return 1; + } else { + return reducerLookup.get(interval); + } + } + + @Override + public Configuration getConf() + { + return config; + } + + @Override + public void setConf(Configuration config) + { + this.config = config; + HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfigBuilder.fromConfiguration(config); + if (hadoopConfig.getSegmentGranularIntervals().isPresent()) { + determineIntervals = false; + int reducerNumber = 0; + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) { + builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++); + } + reducerLookup = builder.build(); + } else { + determineIntervals = true; + } + } + } + } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java index 7a8c25dd137..913629e8a58 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -57,7 +57,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby if (config.isDeterminingPartitions()) { jobs.add(config.getPartitionsSpec().getPartitionJob(config)); } else { - int shardsPerInterval = config.getPartitionsSpec().getShardCount(); + int shardsPerInterval = config.getPartitionsSpec().getNumShards(); Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); int shardCount = 0; for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java index 9b64006a072..e0d7deb4a32 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java @@ -26,17 +26,16 @@ import com.google.common.base.Preconditions; public abstract class AbstractPartitionsSpec implements PartitionsSpec { private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5; - private static final int MAX_SHARDS = 128; private final long targetPartitionSize; private final long maxPartitionSize; private final boolean assumeGrouped; - private final int shardCount; + private final int numShards; public AbstractPartitionsSpec( Long targetPartitionSize, Long maxPartitionSize, Boolean assumeGrouped, - Integer shardCount + Integer numShards ) { this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize; @@ -44,15 +43,11 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD) : maxPartitionSize; this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; - this.shardCount = shardCount == null ? -1 : shardCount; + this.numShards = numShards == null ? -1 : numShards; Preconditions.checkArgument( - this.targetPartitionSize == -1 || this.shardCount == -1, + this.targetPartitionSize == -1 || this.numShards == -1, "targetPartitionsSize and shardCount both cannot be set" ); - Preconditions.checkArgument( - this.shardCount < MAX_SHARDS, - "shardCount cannot be more than MAX_SHARD_COUNT[%d] ", MAX_SHARDS - ); } @JsonProperty @@ -80,8 +75,8 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec } @Override - public int getShardCount() + public int getNumShards() { - return shardCount; + return numShards; } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java index 652b790eb48..ca53b959591 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java @@ -34,10 +34,10 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, - @JsonProperty("shardCount") @Nullable Integer shardCount + @JsonProperty("numShards") @Nullable Integer numShards ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards); } @Override diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java index 1aa1495662c..a36555f0ea8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java @@ -50,6 +50,6 @@ public interface PartitionsSpec public boolean isDeterminingPartitions(); @JsonProperty - public int getShardCount(); + public int getNumShards(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java index 6f0d66d4da5..777db4cc5c8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java @@ -33,9 +33,9 @@ public class RandomPartitionsSpec extends HashedPartitionsSpec @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, - @JsonProperty("shardCount") @Nullable Integer shardCount + @JsonProperty("numShards") @Nullable Integer numShards ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 045c046bb17..ba03ca7d76f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -602,7 +602,7 @@ public class HadoopDruidIndexerConfigTest "{" + "\"partitionsSpec\":{" + " \"type\":\"hashed\"," - + " \"shardCount\":2" + + " \"numShards\":2" + " }" + "}", HadoopDruidIndexerConfig.class @@ -634,7 +634,7 @@ public class HadoopDruidIndexerConfigTest Assert.assertEquals( "shardCount", - partitionsSpec.getShardCount(), + partitionsSpec.getNumShards(), 2 ); diff --git a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java index b97738aa91a..bef339a9c2c 100644 --- a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java @@ -25,6 +25,7 @@ import com.metamx.common.logger.Logger; import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; @@ -104,7 +105,7 @@ public class DbSegmentPublisher implements SegmentPublisher .bind("created_date", new DateTime().toString()) .bind("start", segment.getInterval().getStart().toString()) .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", segment.getShardSpec().getPartitionNum()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1) .bind("version", segment.getVersion()) .bind("used", true) .bind("payload", jsonMapper.writeValueAsString(segment))