From 0748eabe9bec95be0da1b93948694485a9b7798a Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 23 Apr 2014 00:05:08 +0530 Subject: [PATCH] batch ingestion fixes 1) Fix path when mapped output is compressed 2) Add number of reducers to the determine hashed partitions job manually 3) Add a way to disable determine partitions and specify shardCount in HashedPartitionsSpec --- .../indexer/DetermineHashedPartitionsJob.java | 61 +++++++++---------- .../druid/indexer/DeterminePartitionsJob.java | 29 ++++----- .../HadoopDruidDetermineConfigurationJob.java | 19 +++++- .../partitions/AbstractPartitionsSpec.java | 21 ++++++- .../partitions/HashedPartitionsSpec.java | 6 +- .../indexer/partitions/PartitionsSpec.java | 3 + .../partitions/RandomPartitionsSpec.java | 8 +-- .../SingleDimensionPartitionsSpec.java | 2 +- 8 files changed, 88 insertions(+), 61 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index ae2d61a9a93..e3782916902 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -100,6 +100,8 @@ public class DetermineHashedPartitionsJob implements Jobby groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); if (!config.getSegmentGranularIntervals().isPresent()) { groupByJob.setNumReduceTasks(1); + } else { + groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); } JobHelper.setupClasspath(config, groupByJob); @@ -124,9 +126,6 @@ public class DetermineHashedPartitionsJob implements Jobby if (!config.getSegmentGranularIntervals().isPresent()) { final Path intervalInfoPath = config.makeIntervalInfoPath(); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); - if (!fileSystem.exists(intervalInfoPath)) { - throw new ISE("Path[%s] didn't exist!?", intervalInfoPath); - } List intervals = config.jsonMapper.readValue( Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference>() { @@ -144,37 +143,33 @@ public class DetermineHashedPartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { - Long cardinality = config.jsonMapper.readValue( - Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() - { - } - ); - int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); - - if (numberOfShards > MAX_SHARDS) { - throw new ISE( - "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high", - numberOfShards, - MAX_SHARDS - ); - } - - List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); - if (numberOfShards == 1) { - actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); - } else { - for (int i = 0; i < numberOfShards; ++i) { - actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); - } - } - - shardSpecs.put(bucket, actualSpecs); - - } else { - log.info("Path[%s] didn't exist!?", partitionInfoPath); + Long cardinality = config.jsonMapper.readValue( + Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() + { } + ); + int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); + + if (numberOfShards > MAX_SHARDS) { + throw new ISE( + "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high", + numberOfShards, + MAX_SHARDS + ); + } + + List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); + if (numberOfShards == 1) { + actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); + } else { + for (int i = 0; i < numberOfShards; ++i) { + actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); + } + } + + shardSpecs.put(bucket, actualSpecs); + } config.setShardSpecs(shardSpecs); log.info( diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 890a3516189..ddcb691ef09 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -215,23 +215,20 @@ public class DeterminePartitionsJob implements Jobby if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); } - if (fileSystem.exists(partitionInfoPath)) { - List specs = config.jsonMapper.readValue( - Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() - { - } - ); - - List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); - for (int i = 0; i < specs.size(); ++i) { - actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); - } - - shardSpecs.put(segmentGranularity.getStart(), actualSpecs); - } else { - log.info("Path[%s] didn't exist!?", partitionInfoPath); + List specs = config.jsonMapper.readValue( + Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference>() + { } + ); + + List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); + for (int i = 0; i < specs.size(); ++i) { + actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); + } + + shardSpecs.put(segmentGranularity.getStart(), actualSpecs); + } config.setShardSpecs(shardSpecs); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java index 2076292260d..7a8c25dd137 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; @@ -56,13 +57,25 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby if (config.isDeterminingPartitions()) { jobs.add(config.getPartitionsSpec().getPartitionJob(config)); } else { + int shardsPerInterval = config.getPartitionsSpec().getShardCount(); Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); int shardCount = 0; for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { DateTime bucket = segmentGranularity.getStart(); - final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++); - shardSpecs.put(bucket, Lists.newArrayList(spec)); - log.info("DateTime[%s], spec[%s]", bucket, spec); + if (shardsPerInterval > 0) { + for (int i = 0; i < shardsPerInterval; i++) { + final HadoopyShardSpec spec = new HadoopyShardSpec( + new HashBasedNumberedShardSpec(i, shardsPerInterval), + shardCount++ + ); + shardSpecs.put(bucket, Lists.newArrayList(spec)); + log.info("DateTime[%s], spec[%s]", bucket, spec); + } + } else { + final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++); + shardSpecs.put(bucket, Lists.newArrayList(spec)); + log.info("DateTime[%s], spec[%s]", bucket, spec); + } } config.setShardSpecs(shardSpecs); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java index 90fab3e0435..47498e0f0ef 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java @@ -20,19 +20,23 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; public abstract class AbstractPartitionsSpec implements PartitionsSpec { private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5; + private static final int MAX_SHARDS = 128; private final long targetPartitionSize; private final long maxPartitionSize; private final boolean assumeGrouped; + private final int shardCount; public AbstractPartitionsSpec( Long targetPartitionSize, Long maxPartitionSize, - Boolean assumeGrouped + Boolean assumeGrouped, + Integer shardCount ) { this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize; @@ -40,6 +44,15 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD) : maxPartitionSize; this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; + this.shardCount = shardCount == null ? -1 : shardCount; + Preconditions.checkArgument( + targetPartitionSize == -1 || shardCount == -1, + "targetPartitionsSize and shardCount both cannot be set" + ); + Preconditions.checkArgument( + shardCount < MAX_SHARDS, + "shardCount cannot be more than MAX_SHARD_COUNT[%d] ", MAX_SHARDS + ); } @JsonProperty @@ -65,4 +78,10 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec { return targetPartitionSize > 0; } + + @Override + public int getShardCount() + { + return shardCount; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java index d164cef1638..b67b61d2b5a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java @@ -21,6 +21,7 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import io.druid.indexer.DetermineHashedPartitionsJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.Jobby; @@ -33,10 +34,11 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec public HashedPartitionsSpec( @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, - @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped + @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, + @JsonProperty("shardCount") @Nullable Integer shardCount ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount); } @Override diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java index cce5de8becf..1aa1495662c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java @@ -49,4 +49,7 @@ public interface PartitionsSpec @JsonIgnore public boolean isDeterminingPartitions(); + @JsonProperty + public int getShardCount(); + } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java index 30f13f49478..6f0d66d4da5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java @@ -21,9 +21,6 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import io.druid.indexer.DetermineHashedPartitionsJob; -import io.druid.indexer.HadoopDruidIndexerConfig; -import io.druid.indexer.Jobby; import javax.annotation.Nullable; @@ -35,9 +32,10 @@ public class RandomPartitionsSpec extends HashedPartitionsSpec public RandomPartitionsSpec( @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, - @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped + @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, + @JsonProperty("shardCount") @Nullable Integer shardCount ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index 118d1355914..7964c1cbe6f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -41,7 +41,7 @@ public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped ) { - super(targetPartitionSize, maxPartitionSize, assumeGrouped); + super(targetPartitionSize, maxPartitionSize, assumeGrouped, null); this.partitionDimension = partitionDimension; }