diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java index 543d2c40bf1..a238d5e0254 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; import io.druid.db.DbConnector; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; @@ -80,7 +81,7 @@ public class DbUpdaterJob implements Jobby .put("created_date", new DateTime().toString()) .put("start", segment.getInterval().getStart().toString()) .put("end", segment.getInterval().getEnd().toString()) - .put("partitioned", segment.getShardSpec().getPartitionNum()) + .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1) .put("version", segment.getVersion()) .put("used", true) .put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment)) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index ae2d61a9a93..530d155460d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -37,6 +37,7 @@ import io.druid.indexer.granularity.UniformGranularitySpec; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +46,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -65,7 +67,6 @@ import java.util.Set; */ public class DetermineHashedPartitionsJob implements Jobby { - private static final int MAX_SHARDS = 128; private static final Logger log = new Logger(DetermineHashedPartitionsJob.class); private final HadoopDruidIndexerConfig config; @@ -98,8 +99,11 @@ public class DetermineHashedPartitionsJob implements Jobby groupByJob.setOutputKeyClass(NullWritable.class); groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); + groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class); if (!config.getSegmentGranularIntervals().isPresent()) { groupByJob.setNumReduceTasks(1); + } else { + groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); } JobHelper.setupClasspath(config, groupByJob); @@ -124,9 +128,6 @@ public class DetermineHashedPartitionsJob implements Jobby if (!config.getSegmentGranularIntervals().isPresent()) { final Path intervalInfoPath = config.makeIntervalInfoPath(); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); - if (!fileSystem.exists(intervalInfoPath)) { - throw new ISE("Path[%s] didn't exist!?", intervalInfoPath); - } List intervals = config.jsonMapper.readValue( Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference>() { @@ -144,37 +145,25 @@ public class DetermineHashedPartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { - Long cardinality = config.jsonMapper.readValue( - Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() - { - } - ); - int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); - - if (numberOfShards > MAX_SHARDS) { - throw new ISE( - "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high", - numberOfShards, - MAX_SHARDS - ); - } - - List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); - if (numberOfShards == 1) { - actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); - } else { - for (int i = 0; i < numberOfShards; ++i) { - actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); - } - } - - shardSpecs.put(bucket, actualSpecs); - - } else { - log.info("Path[%s] didn't exist!?", partitionInfoPath); + final Long cardinality = config.jsonMapper.readValue( + Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() + { } + ); + final int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); + + List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); + if (numberOfShards == 1) { + actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); + } else { + for (int i = 0; i < numberOfShards; ++i) { + actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); + } + } + + shardSpecs.put(bucket, actualSpecs); + } config.setShardSpecs(shardSpecs); log.info( @@ -319,13 +308,6 @@ public class DetermineHashedPartitionsJob implements Jobby } } - private byte[] getDataBytes(BytesWritable writable) - { - byte[] rv = new byte[writable.getLength()]; - System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength()); - return rv; - } - @Override public void run(Context context) throws IOException, InterruptedException @@ -353,6 +335,50 @@ public class DetermineHashedPartitionsJob implements Jobby } } } + + public static class DetermineHashedPartitionsPartitioner + extends Partitioner implements Configurable + { + private Configuration config; + private boolean determineIntervals; + private Map reducerLookup; + + @Override + public int getPartition(LongWritable interval, BytesWritable text, int numPartitions) + { + + if (config.get("mapred.job.tracker").equals("local") || determineIntervals) { + return 0; + } else { + return reducerLookup.get(interval); + } + } + + @Override + public Configuration getConf() + { + return config; + } + + @Override + public void setConf(Configuration config) + { + this.config = config; + HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfigBuilder.fromConfiguration(config); + if (hadoopConfig.getSegmentGranularIntervals().isPresent()) { + determineIntervals = false; + int reducerNumber = 0; + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) { + builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++); + } + reducerLookup = builder.build(); + } else { + determineIntervals = true; + } + } + } + } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 890a3516189..ddcb691ef09 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -215,23 +215,20 @@ public class DeterminePartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { - List specs = config.jsonMapper.readValue( - Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() - { - } - ); - - List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); - for (int i = 0; i < specs.size(); ++i) { - actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); - } - - shardSpecs.put(segmentGranularity.getStart(), actualSpecs); - } else { - log.info("Path[%s] didn't exist!?", partitionInfoPath); + List specs = config.jsonMapper.readValue( + Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() + { } + ); + + List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); + for (int i = 0; i < specs.size(); ++i) { + actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); + } + + shardSpecs.put(segmentGranularity.getStart(), actualSpecs); + } config.setShardSpecs(shardSpecs); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java index 2076292260d..311eec6248e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; @@ -56,13 +57,28 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby if (config.isDeterminingPartitions()) { jobs.add(config.getPartitionsSpec().getPartitionJob(config)); } else { + int shardsPerInterval = config.getPartitionsSpec().getNumShards(); Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); int shardCount = 0; for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { DateTime bucket = segmentGranularity.getStart(); - final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++); - shardSpecs.put(bucket, Lists.newArrayList(spec)); - log.info("DateTime[%s], spec[%s]", bucket, spec); + if (shardsPerInterval > 0) { + List specs = Lists.newArrayListWithCapacity(shardsPerInterval); + for (int i = 0; i < shardsPerInterval; i++) { + specs.add( + new HadoopyShardSpec( + new HashBasedNumberedShardSpec(i, shardsPerInterval), + shardCount++ + ) + ); + } + shardSpecs.put(bucket, specs); + log.info("DateTime[%s], spec[%s]", bucket, specs); + } else { + final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++); + shardSpecs.put(bucket, Lists.newArrayList(spec)); + log.info("DateTime[%s], spec[%s]", bucket, spec); + } } config.setShardSpecs(shardSpecs); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java index 90fab3e0435..e0d7deb4a32 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java @@ -20,6 +20,7 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; public abstract class AbstractPartitionsSpec implements PartitionsSpec @@ -28,11 +29,13 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec private final long targetPartitionSize; private final long maxPartitionSize; private final boolean assumeGrouped; + private final int numShards; public AbstractPartitionsSpec( Long targetPartitionSize, Long maxPartitionSize, - Boolean assumeGrouped + Boolean assumeGrouped, + Integer numShards ) { this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize; @@ -40,6 +43,11 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD) : maxPartitionSize; this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; + this.numShards = numShards == null ? -1 : numShards; + Preconditions.checkArgument( + this.targetPartitionSize == -1 || this.numShards == -1, + "targetPartitionsSize and shardCount both cannot be set" + ); } @JsonProperty @@ -65,4 +73,10 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec { return targetPartitionSize > 0; } + + @Override + public int getNumShards() + { + return numShards; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java index d164cef1638..ca53b959591 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java @@ -33,10 +33,11 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec public HashedPartitionsSpec( @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, - @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped + @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, + @JsonProperty("numShards") @Nullable Integer numShards ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards); } @Override diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java index cce5de8becf..a36555f0ea8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java @@ -49,4 +49,7 @@ public interface PartitionsSpec @JsonIgnore public boolean isDeterminingPartitions(); + @JsonProperty + public int getNumShards(); + } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java index 30f13f49478..777db4cc5c8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java @@ -21,9 +21,6 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import io.druid.indexer.DetermineHashedPartitionsJob; -import io.druid.indexer.HadoopDruidIndexerConfig; -import io.druid.indexer.Jobby; import javax.annotation.Nullable; @@ -35,9 +32,10 @@ public class RandomPartitionsSpec extends HashedPartitionsSpec public RandomPartitionsSpec( @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, - @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped + @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, + @JsonProperty("numShards") @Nullable Integer numShards ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index 118d1355914..7964c1cbe6f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -41,7 +41,7 @@ public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, null); this.partitionDimension = partitionDimension; } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index c6bb0ba719f..ba03ca7d76f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -216,10 +216,10 @@ public class HadoopDruidIndexerConfigTest 150 ); - Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec); Assert.assertEquals( "getPartitionDimension", - ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(), + ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(), "foo" ); } @@ -262,10 +262,10 @@ public class HadoopDruidIndexerConfigTest 150 ); - Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec); Assert.assertEquals( "getPartitionDimension", - ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(), + ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(), "foo" ); } @@ -311,10 +311,10 @@ public class HadoopDruidIndexerConfigTest 200 ); - Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec); Assert.assertEquals( "getPartitionDimension", - ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(), + ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(), "foo" ); } @@ -503,7 +503,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testRandomPartitionsSpec() throws Exception{ + public void testRandomPartitionsSpec() throws Exception + { { final HadoopDruidIndexerConfig cfg; @@ -542,12 +543,13 @@ public class HadoopDruidIndexerConfigTest 150 ); - Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof RandomPartitionsSpec); } } @Test - public void testHashedPartitionsSpec() throws Exception{ + public void testHashedPartitionsSpec() throws Exception + { { final HadoopDruidIndexerConfig cfg; @@ -586,7 +588,57 @@ public class HadoopDruidIndexerConfigTest 150 ); - Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof HashedPartitionsSpec); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); } } + + @Test + public void testHashedPartitionsSpecShardCount() throws Exception + { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonReadWriteRead( + "{" + + "\"partitionsSpec\":{" + + " \"type\":\"hashed\"," + + " \"numShards\":2" + + " }" + + "}", + HadoopDruidIndexerConfig.class + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec(); + + Assert.assertEquals( + "isDeterminingPartitions", + partitionsSpec.isDeterminingPartitions(), + false + ); + + Assert.assertEquals( + "getTargetPartitionSize", + partitionsSpec.getTargetPartitionSize(), + -1 + ); + + Assert.assertEquals( + "getMaxPartitionSize", + partitionsSpec.getMaxPartitionSize(), + -1 + ); + + Assert.assertEquals( + "shardCount", + partitionsSpec.getNumShards(), + 2 + ); + + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); + + } } diff --git a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java index b97738aa91a..bef339a9c2c 100644 --- a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java @@ -25,6 +25,7 @@ import com.metamx.common.logger.Logger; import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; @@ -104,7 +105,7 @@ public class DbSegmentPublisher implements SegmentPublisher .bind("created_date", new DateTime().toString()) .bind("start", segment.getInterval().getStart().toString()) .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", segment.getShardSpec().getPartitionNum()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1) .bind("version", segment.getVersion()) .bind("used", true) .bind("payload", jsonMapper.writeValueAsString(segment))